airflow priority_weight 计算方法
原文链接 https://morefreeze.github.io/2017/10/airflow-priority_weight.html
注:以下为加速网络访问所做的原文缓存,经过重新格式化,可能存在格式方面的问题,或偶有遗漏信息,请以原文为准。
{% include JB/setup %}
最近发现 airflow 任务执行顺序有些奇怪,于是看了下 airflow 关于权重的处理,解答了心中的疑问。
以最新的稳定版(v1.8-stable)为例,代码总共就这么一小段
@property
def priority_weight_total(self):
return sum([
t.priority_weight
for t in self.get_flat_relatives(upstream=False)
]) + self.priority_weight
<!--more-->
这是类 Operator 下的一个属性,同时可以发现在 airflow 里,Task 和 Operator 概念是互通的,
(可以看到许多地方传参都是一个 task,实际传的都是 Operator。注意区别 Task 和 TaskInstance)
计算方法就是把所有下游(依赖它的)任务的权重和自己的权重加起来,get_flat_relatives
就是递归地遍历所有下游任务,返回一个数组,顺便说下upstream=True
就是遍历所有上游任务。
因此,可以得到一个结论,任务依赖层级越多,越容易出现权重大的任务,这也就解释了为什么我设置了
t1 >> t2
权重都是7,又设置了r1 >> r2 >> r3
权重都是5,t1
的权重却比r1
的权重小。
源码面前,了无秘密 —— 侯捷