airflow priority_weight 计算方法

2017-10-12 MoreFreeze 更多博文 » 博客 » GitHub »

airflow python

原文链接 https://morefreeze.github.io/2017/10/airflow-priority_weight.html
注:以下为加速网络访问所做的原文缓存,经过重新格式化,可能存在格式方面的问题,或偶有遗漏信息,请以原文为准。


{% include JB/setup %}

最近发现 airflow 任务执行顺序有些奇怪,于是看了下 airflow 关于权重的处理,解答了心中的疑问。

以最新的稳定版(v1.8-stable)为例,代码总共就这么一小段

    @property
    def priority_weight_total(self):
        return sum([
            t.priority_weight
            for t in self.get_flat_relatives(upstream=False)
        ]) + self.priority_weight

<!--more-->

这是类 Operator 下的一个属性,同时可以发现在 airflow 里,Task 和 Operator 概念是互通的, (可以看到许多地方传参都是一个 task,实际传的都是 Operator。注意区别 Task 和 TaskInstance) 计算方法就是把所有下游(依赖它的)任务的权重和自己的权重加起来,get_flat_relatives 就是递归地遍历所有下游任务,返回一个数组,顺便说下upstream=True就是遍历所有上游任务。

因此,可以得到一个结论,任务依赖层级越多,越容易出现权重大的任务,这也就解释了为什么我设置了 t1 >> t2权重都是7,又设置了r1 >> r2 >> r3 权重都是5,t1的权重却比r1的权重小。

源码面前,了无秘密 —— 侯捷