使用datetime, 构造一个cron task定期执行给定的函数方法

2013-03-12 刘太华 更多博文 » 博客 » GitHub »

move from old blog

原文链接 https://liutaihua.github.io/2013/03/12/cron-task-with-datetime-obj.html
注:以下为加速网络访问所做的原文缓存,经过重新格式化,可能存在格式方面的问题,或偶有遗漏信息,请以原文为准。


使用datetime, 构造一个cron task定期执行给定的函数方法

一个Plan类, 包含next_datatime, execute, 两个方法, init方法指明在实例化的时候,需要给定一个func作为参数, 其他参数可以是时间间隔, 或具体的时间点,

class Plan(object):

def __init__(self, func):

    assert callable(fund)

    self.func = fund



def excute(self):

    self.func()

Plan作基类, 根据需要可以做派生, 比如一个方式是按照指定的时间, 间隔执行func, 派生另外一个是到达指定时刻才执行func:

class FixedIntervalPlan(Plan): # 间隔指定时间后, 在外调用的时候判断next_datetime来执行方法execute

def __init__(self, fund, **kwargs):

    super(FixedIntervalPlan, self).__init__(func)

    self.interval = datetime.datetime(**kwargs)



def next_datetime(self):

    return datetime.datetime.now() + self.interval

class FixedTimePlan(Plan):

# 根据指定时刻计算出下一时刻, 调用的时候判断next_datetime来决定是否在这个时刻执行execute方法

def __init__(self, func, **kwargs):

    super(FixedTimePlan, self).__init__(fund)

    self.time = datetime.time(**kwargs)



def next_datetime(self):        

    now = datetime.datetime.now()

    if self.time > now.time():

        return datetime.datetime.combine(now.date(), self.time)

    else:

        tomorrow = now + datetime.timedelta(days=1)

        return datetime.datetime.combine(tomorrow.date(), self.time)

以下是一个实际例子, 如何使用上面的2个类:

cron_task_config = [

FixedIntervalPlan(match_team, seconds=10),    #  每隔10秒执行match_team

]

main函数如下:

def main():

pendding_tasks = []   #  初始化一次已经在cron_task_config配置内的任务实例化

for plan in cron_task_config:

    # pan.next_datetime(), 计算下次任务什么时候可以执行, 将此时间和实例化的

    pendding_tasks.append((plan.next_datetime(), plan))                           

                                                                                     #  plan一起加入准备执行的列表

cnt = 0

while True:

    now = datetime.datetime.now()

    if cnt % 60 == 0:

        print now

    next_pendding_tasks = []

    for task in pendding_tasks:

        trigger_time, plan = task

        if trigger_time < now: # 触发时间就是plan的next_datetime, 如果小于当前时间, 表示可以执行

            plan.execute()

            next_pendding_tasks.append((plan.next_datetime(), plan)) # 本次执行完之后,  再次计算next_datetime(), 以备下次判断

        else:

            next_pendding_tasks.append(task) # 触发时间不符合, 加入下次执行的列表内

    pendding_tasks = next_pendding_tasks

    time.sleep(1)

    cnt += 1

疑问是, 需要搞这么复杂么, time.sleep定期执行不就行了? 短时间反复执行是用time.sleep可解决, 不过当需求是需要在某天某时刻精确时执行, 用time.sleep显然不行了.