python任务调度第三方模块schedule
概述
Python 任务调度模块,非常的轻量,不需要额外的依赖,可使用友好的语法来定时调用python函数或者其他可调用对象
Schedule 并不是一个放之四海而皆准的任务调度库,此库旨在为简单的调度问题提供简单的解决方案。如果碰到如下情况,请使用其他更有效的方法:
- 任务持久化(记住重新启动之间的时间表)
- 精确的计时任务(如sub-second级)
- 并发执行(多线程)
- 本地化(时区、工作日或节假日)
相关文档
安装
pip install schedule
基本用法
定时任务
import schedule
import time
def job():
print("I'm working...")
# 每3秒/分钟/小时/天/星期执行任务
schedule.every(3).seconds.do(job)
schedule.every(3).minutes.do(job)
schedule.every(3).hours.do(job)
schedule.every(3).days.do(job)
schedule.every(3).weeks.do(job)
# 每分钟的第23秒执行任务
# schedule.every().minute.at(":23").do(job)
# 每个小时的第42分执行任务
schedule.every().hour.at(":42").do(job)
# 隔第5个小时的20分钟30秒执行任务
# 如当前时间是2点,首次执行时间是6点20分30秒
schedule.every(5).hours.at("20:30").do(job)
# 每天的指定时间执行任务
schedule.every().day.at("10:30").do(job)
schedule.every().day.at("10:30:42").do(job)
# 每个星期的指定天执行任务
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
while True:
schedule.run_pending()
time.sleep(1)
发布后的周期任务需要用 run_pending 函数来检测是否执行,因此需要一个 While 循环不断地轮询这个函数
使用装饰器调度
from schedule import repeat, every, run_pending
import time
@repeat(every(3).seconds)
def job():
print("I'm working...")
while True:
run_pending()
time.sleep(1)
参数传递
import schedule
import time
def greet(name):
print(f"hello {name}")
schedule.every(3).seconds.do(greet, 'jerry')
while True:
schedule.run_pending()
time.sleep(1)
from schedule import repeat, every, run_pending
import time
@repeat(every(3).seconds, 'jerry')
def greet(name):
print(f"hello {name}")
while True:
run_pending()
time.sleep(1)
只执行一次任务
- 取消任务cancel_job()
import schedule
def some_task():
print('Hello world')
job = schedule.every(3).seconds.do(some_task)
schedule.cancel_job(job)
- 只执行一次任务CancelJob
import schedule
import time
def job_that_executes_once():
print('job executes once')
return schedule.CancelJob
schedule.every(3).seconds.do(job_that_executes_once)
while True:
schedule.run_pending()
time.sleep(1)
- 取消所有任务clear()
import time
import schedule
def greet(name):
print('Hello {}'.format(name))
# return schedule.CancelJob
schedule.every().second.do(greet, 'jerry')
schedule.clear()
while True:
schedule.run_pending()
time.sleep(1)
获取任务
import schedule
def hello():
print('Hello world')
schedule.every().second.do(hello)
schedule.every().minutes.do(hello)
all_jobs = schedule.get_jobs()
print(all_jobs)
任务标签tag
- 打tag
import schedule
def greet(name):
print('Hello {}'.format(name))
tag = schedule.every().seconds.do(greet, 'jerry').tag('tag1', 'tag2')
print(tag)
- 根据tag过滤并取消任务
import schedule
def greet(name):
print('Hello {}'.format(name))
tag1 = schedule.every().seconds.do(greet, 'jerry').tag('seconds-tasks', 'myself')
tag2 = schedule.every().minutes.do(greet, 'tom').tag('minutes-tasks', 'friend')
print(tag1)
print(tag2)
print(schedule.get_jobs())
schedule.clear('friend')
print(schedule.get_jobs())
随机时间间隔定时执行
- every(A).to(B).seconds:每隔N秒执行(
A<=N<=B
)
import schedule
import datetime
def my_job():
print(f'{datetime.datetime.now()}: 执行my_job')
schedule.every(1).to(10).seconds.do(my_job)
while True:
schedule.run_pending()
指定deadline时间内执行任务until
import schedule
from datetime import datetime, timedelta, time
def job():
print('Boo')
# 每秒执行,直到今天的11:35停止执行
schedule.every(1).seconds.until("11:35").do(job)
# 直到 2030-01-01
schedule.every(1).hours.until("2030-01-01 18:33").do(job)
# 接下来 8 小时运行,否则停止
schedule.every(1).hours.until(timedelta(hours=8)).do(job)
# 直到 11:33:42
schedule.every(1).hours.until(time(11, 33, 42)).do(job)
# 指定时间前执行
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)
while True:
schedule.run_pending()
获取下次任务执行的时间idle_seconds()
- 如果没有任务则返回None,如果下次任务发生在过去时间则为负数
import schedule
import time
def job():
print('Hello')
schedule.every(5).seconds.do(job)
while 1:
n = schedule.idle_seconds()
# print(n)
if n is None:
break
elif n > 0:
time.sleep(n)
schedule.run_pending()
立即运行所有任务
- 如果某个机制触发了,需要立即运行所有任务,可以调用 schedule.run_all()
import schedule
def job_1():
print('Foo')
def job_2():
print('Bar')
schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)
schedule.run_all(delay_seconds=3) # delay_seconds两个任务之间的时间间隔
# while True:
# schedule.run_all(delay_seconds=3)
进阶用法
后台执行
- 使用子线程来执行定时任务
import threading
import time
import schedule
def run_continuously(interval=1):
"""Continuously run, while executing pending jobs at each
elapsed time interval.
@return cease_continuous_run: threading. Event which can
be set to cease continuous run. Please note that it is
*intended behavior that run_continuously() does not run
missed jobs*. For example, if you've registered a job that
should run every minute and you set a continuous run
interval of one hour then your job won't be run 60 times
at each interval but only once.
"""
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
while not cease_continuous_run.is_set():
schedule.run_pending()
time.sleep(interval)
continuous_thread = ScheduleThread()
continuous_thread.start()
return cease_continuous_run
def background_job():
print('Hello from the background thread')
schedule.every().second.do(background_job)
# Start the background thread
stop_run_continuously = run_continuously()
# Do some other things...
time.sleep(10)
# Stop the background thread
stop_run_continuously.set()
并行执行
import threading
import time
import schedule
def job():
print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.start()
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
while 1:
schedule.run_pending()
time.sleep(1)
import time
import threading
import schedule
import queue
def job():
print("I'm working")
def worker_main():
while 1:
job_func = jobqueue.get()
job_func()
jobqueue.task_done()
jobqueue = queue.Queue()
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
worker_thread = threading.Thread(target=worker_main)
worker_thread.start()
while 1:
schedule.run_pending()
time.sleep(1)
可进一步参考官方文档,查看应用场景
异常处理
import functools
import schedule
def catch_exceptions(cancel_on_failure=False):
def catch_exceptions_decorator(job_func):
@functools.wraps(job_func)
def wrapper(*args, **kwargs):
try:
return job_func(*args, **kwargs)
except:
import traceback
print(traceback.format_exc())
if cancel_on_failure:
return schedule.CancelJob
return wrapper
return catch_exceptions_decorator
@catch_exceptions(cancel_on_failure=True)
def bad_task():
return 1 / 0
schedule.every(5).minutes.do(bad_task)
日志处理
import schedule
import logging
logging.basicConfig()
schedule_logger = logging.getLogger('schedule')
schedule_logger.setLevel(level=logging.DEBUG)
def job():
print("Hello, Logs")
schedule.every().second.do(job)
schedule.run_all()
schedule.clear()
- 自定义logging
import functools
import time
import schedule
# This decorator can be applied to any job function to log the elapsed time of each job
def print_elapsed_time(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_timestamp = time.time()
print('LOG: Running job "%s"' % func.__name__)
result = func(*args, **kwargs)
print('LOG: Job "%s" completed in %d seconds' % (func.__name__, time.time() - start_timestamp))
return result
return wrapper
@print_elapsed_time
def job():
print('Hello, Logs')
time.sleep(5)
schedule.every().second.do(job)
schedule.run_all()
多个任务
import time
import schedule
def fooJob():
print("Foo")
def barJob():
print("Bar")
# 创建scheduler
scheduler1 = schedule.Scheduler()
# 给scheduler添加job
scheduler1.every().hour.do(fooJob)
scheduler1.every().hour.do(barJob)
# 第二个scheduler
scheduler2 = schedule.Scheduler()
scheduler2.every().second.do(fooJob)
scheduler2.every().second.do(barJob)
while True:
# 每个scheduler都需要调用run_pending
scheduler1.run_pending()
scheduler2.run_pending()
time.sleep(1)