python任务调度第三方模块schedule

概述

Python 任务调度模块,非常的轻量,不需要额外的依赖,可使用友好的语法来定时调用python函数或者其他可调用对象
Schedule 并不是一个放之四海而皆准的任务调度库,此库旨在为简单的调度问题提供简单的解决方案。如果碰到如下情况,请使用其他更有效的方法:

  1. 任务持久化(记住重新启动之间的时间表)
  2. 精确的计时任务(如sub-second级)
  3. 并发执行(多线程)
  4. 本地化(时区、工作日或节假日)

相关文档

安装

pip install schedule

基本用法

定时任务

import schedule
import time

def job():
    print("I'm working...")

# 每3秒/分钟/小时/天/星期执行任务
schedule.every(3).seconds.do(job)
schedule.every(3).minutes.do(job)
schedule.every(3).hours.do(job)
schedule.every(3).days.do(job)
schedule.every(3).weeks.do(job)

# 每分钟的第23秒执行任务
# schedule.every().minute.at(":23").do(job)

# 每个小时的第42分执行任务
schedule.every().hour.at(":42").do(job)

# 隔第5个小时的20分钟30秒执行任务
# 如当前时间是2点,首次执行时间是6点20分30秒
schedule.every(5).hours.at("20:30").do(job)

# 每天的指定时间执行任务
schedule.every().day.at("10:30").do(job)
schedule.every().day.at("10:30:42").do(job)

# 每个星期的指定天执行任务
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

发布后的周期任务需要用 run_pending 函数来检测是否执行,因此需要一个 While 循环不断地轮询这个函数

使用装饰器调度

from schedule import repeat, every, run_pending
import time

@repeat(every(3).seconds)
def job():
    print("I'm working...")

while True:
    run_pending()
    time.sleep(1)

参数传递

import schedule
import time

def greet(name):
    print(f"hello {name}")

schedule.every(3).seconds.do(greet, 'jerry')

while True:
    schedule.run_pending()
    time.sleep(1)
from schedule import repeat, every, run_pending
import time

@repeat(every(3).seconds, 'jerry')
def greet(name):
    print(f"hello {name}")

while True:
    run_pending()
    time.sleep(1)

只执行一次任务

  • 取消任务cancel_job()
import schedule


def some_task():
    print('Hello world')


job = schedule.every(3).seconds.do(some_task)
schedule.cancel_job(job)
  • 只执行一次任务CancelJob
import schedule
import time

def job_that_executes_once():
    print('job executes once')
    return schedule.CancelJob

schedule.every(3).seconds.do(job_that_executes_once)

while True:
    schedule.run_pending()
    time.sleep(1)
  • 取消所有任务clear()
import time
import schedule


def greet(name):
    print('Hello {}'.format(name))
    # return schedule.CancelJob

schedule.every().second.do(greet, 'jerry')
schedule.clear()

while True:
    schedule.run_pending()
    time.sleep(1)

获取任务

import schedule

def hello():
    print('Hello world')

schedule.every().second.do(hello)
schedule.every().minutes.do(hello)
all_jobs = schedule.get_jobs()
print(all_jobs)

任务标签tag

  • 打tag
import schedule

def greet(name):
    print('Hello {}'.format(name))

tag = schedule.every().seconds.do(greet, 'jerry').tag('tag1', 'tag2')
print(tag)
  • 根据tag过滤并取消任务
import schedule

def greet(name):
    print('Hello {}'.format(name))

tag1 = schedule.every().seconds.do(greet, 'jerry').tag('seconds-tasks', 'myself')
tag2 = schedule.every().minutes.do(greet, 'tom').tag('minutes-tasks', 'friend')
print(tag1)
print(tag2)
print(schedule.get_jobs())
schedule.clear('friend')
print(schedule.get_jobs())

随机时间间隔定时执行

  • every(A).to(B).seconds:每隔N秒执行(A<=N<=B)
import schedule
import datetime

def my_job():
    print(f'{datetime.datetime.now()}: 执行my_job')

schedule.every(1).to(10).seconds.do(my_job)

while True:
    schedule.run_pending()

指定deadline时间内执行任务until

import schedule
from datetime import datetime, timedelta, time

def job():
    print('Boo')

# 每秒执行,直到今天的11:35停止执行
schedule.every(1).seconds.until("11:35").do(job)

# 直到 2030-01-01
schedule.every(1).hours.until("2030-01-01 18:33").do(job)

# 接下来 8 小时运行,否则停止
schedule.every(1).hours.until(timedelta(hours=8)).do(job)

# 直到 11:33:42
schedule.every(1).hours.until(time(11, 33, 42)).do(job)

# 指定时间前执行
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)

while True:
    schedule.run_pending()

获取下次任务执行的时间idle_seconds()

  • 如果没有任务则返回None,如果下次任务发生在过去时间则为负数
import schedule
import time

def job():
    print('Hello')

schedule.every(5).seconds.do(job)

while 1:
    n = schedule.idle_seconds()
    # print(n)
    if n is None:
        break
    elif n > 0:
        time.sleep(n)
    schedule.run_pending()

立即运行所有任务

  • 如果某个机制触发了,需要立即运行所有任务,可以调用 schedule.run_all()
import schedule

def job_1():
    print('Foo')

def job_2():
    print('Bar')

schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)

schedule.run_all(delay_seconds=3)  # delay_seconds两个任务之间的时间间隔
# while True:
#     schedule.run_all(delay_seconds=3)

进阶用法

后台执行

  • 使用子线程来执行定时任务
import threading
import time

import schedule


def run_continuously(interval=1):
    """Continuously run, while executing pending jobs at each
    elapsed time interval.
    @return cease_continuous_run: threading. Event which can
    be set to cease continuous run. Please note that it is
    *intended behavior that run_continuously() does not run
    missed jobs*. For example, if you've registered a job that
    should run every minute and you set a continuous run
    interval of one hour then your job won't be run 60 times
    at each interval but only once.
    """
    cease_continuous_run = threading.Event()

    class ScheduleThread(threading.Thread):
        @classmethod
        def run(cls):
            while not cease_continuous_run.is_set():
                schedule.run_pending()
                time.sleep(interval)

    continuous_thread = ScheduleThread()
    continuous_thread.start()
    return cease_continuous_run


def background_job():
    print('Hello from the background thread')


schedule.every().second.do(background_job)

# Start the background thread
stop_run_continuously = run_continuously()

# Do some other things...
time.sleep(10)

# Stop the background thread
stop_run_continuously.set()

并行执行

import threading
import time
import schedule

def job():
    print("I'm running on thread %s" % threading.current_thread())

def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()

schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)

while 1:
    schedule.run_pending()
    time.sleep(1)
import time
import threading
import schedule
import queue

def job():
    print("I'm working")


def worker_main():
    while 1:
        job_func = jobqueue.get()
        job_func()
        jobqueue.task_done()

jobqueue = queue.Queue()

schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)

worker_thread = threading.Thread(target=worker_main)
worker_thread.start()

while 1:
    schedule.run_pending()
    time.sleep(1)

可进一步参考官方文档,查看应用场景

异常处理

import functools
import schedule

def catch_exceptions(cancel_on_failure=False):
    def catch_exceptions_decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                return job_func(*args, **kwargs)
            except:
                import traceback
                print(traceback.format_exc())
                if cancel_on_failure:
                    return schedule.CancelJob
        return wrapper
    return catch_exceptions_decorator

@catch_exceptions(cancel_on_failure=True)
def bad_task():
    return 1 / 0

schedule.every(5).minutes.do(bad_task)

日志处理

import schedule
import logging

logging.basicConfig()
schedule_logger = logging.getLogger('schedule')
schedule_logger.setLevel(level=logging.DEBUG)

def job():
    print("Hello, Logs")

schedule.every().second.do(job)
schedule.run_all()
schedule.clear()
  • 自定义logging
import functools
import time
import schedule

# This decorator can be applied to any job function to log the elapsed time of each job
def print_elapsed_time(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_timestamp = time.time()
        print('LOG: Running job "%s"' % func.__name__)
        result = func(*args, **kwargs)
        print('LOG: Job "%s" completed in %d seconds' % (func.__name__, time.time() - start_timestamp))
        return result

    return wrapper


@print_elapsed_time
def job():
    print('Hello, Logs')
    time.sleep(5)

schedule.every().second.do(job)
schedule.run_all()

多个任务

import time
import schedule

def fooJob():
    print("Foo")

def barJob():
    print("Bar")

# 创建scheduler
scheduler1 = schedule.Scheduler()

# 给scheduler添加job
scheduler1.every().hour.do(fooJob)
scheduler1.every().hour.do(barJob)

# 第二个scheduler
scheduler2 = schedule.Scheduler()
scheduler2.every().second.do(fooJob)
scheduler2.every().second.do(barJob)

while True:
    # 每个scheduler都需要调用run_pending
    scheduler1.run_pending()
    scheduler2.run_pending()
    time.sleep(1)

参考