安装
- pip3 install -i https://pypi.doubanio.com/simple apscheduler
使用示例代码
间隔性任务
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# @Author : xiao qiang
# @WeChat : xiaoqiangclub
# @Software : PyCharm
# @File : APSchedulerTasks.py
# @Time : 2021/6/22 21:59
import time
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
class APSchedulerTasks:
def intervalTask(self, task_func, block=True, task_id='myIntervalJob', **args):
"""
每隔一定时间执行的任务
:param task_func: 任务函数对象,不带括号
:param block: 默认为True:使用BlockingScheduler()阻塞调度器;False:使用后台运行调度器
:param args: 不设置就每秒执行一次!IntervalTrigger的参数即间隔运行的时间:weeks=0, days=0, hours=0, minutes=0, seconds=0等
:param task_id: 任务id
:return:
"""
if block:
scheduler = BlockingScheduler() # 阻塞性调度器
else:
scheduler = BackgroundScheduler() # 后台运行调度器
trigger = IntervalTrigger(**args) # 触发器
scheduler.add_job(task_func, trigger=trigger, id=task_id)
# scheduler.add_job(task_func, 'interval', seconds=1, id=task_id) # 合并写法
try:
scheduler.start()
except Exception as ret:
print('intervalTask错误:', str(ret))
print('intervalTask:我没被阻塞!!!')
while True:
time.sleep(1)
if __name__ == '__main__':
from datetime import datetime
def task():
print('现在时间:{}'.format(datetime.now()))
test = APSchedulerTasks()
test.intervalTask(task, seconds=2, block=False)
- trigger触发器可以分开写也可以直接合并写,如上示例
自己封装了一个常用功能
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# @Author : xiao qiang
# @WeChat : xiaoqiangclub
# @Software : PyCharm
# @File : APSchedulerTasks.py
# @Time : 2021/6/22 21:59
import time
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.triggers.date import DateTrigger
class APSchedulerTasks:
def run(self, task_func, trigger='interval', block=True, task_id='myIntervalJob', thread_pool_executor=True,
pool_count=10, **args):
"""
每隔一定时间执行的任务
:param task_func: 任务函数对象,不带括号
:param trigger: 触发器类型,默认:'interval',还可以是:'date','cron';文档:https://pypi.org/search/?q=triggers
:param block: 默认为True:使用BlockingScheduler()阻塞调度器;False:使用后台运行调度器
:param args: 触发器对应的触发参数
:param task_id: 任务id
:param thread_pool_executor: 设置执行器,默认为ThreadPoolExecutor(),设置为False:ProcessPoolExecutor()
:param pool_count: 最大线程进程数
:return:
"""
if thread_pool_executor: # 执行器设置
executors = {
'default': ThreadPoolExecutor(pool_count)
}
else:
executors = {
'processpool': ProcessPoolExecutor(pool_count)
}
if block:
scheduler = BlockingScheduler(executors=executors) # 阻塞性调度器
else:
scheduler = BackgroundScheduler(executors=executors) # 后台运行调度器
scheduler.add_job(task_func, trigger, **args, id=task_id) # 合并写法
try:
scheduler.start()
except Exception as ret:
print('intervalTask错误:', str(ret))
print('intervalTask:我没被阻塞!!!')
# while True:
# time.sleep(1)
if __name__ == '__main__':
from datetime import datetime
def task():
print('现在时间:{}'.format(datetime.now()))
test = APSchedulerTasks()
test.run(task, seconds=2, block=False) # 间隔触发
# test.run(task, trigger='date', run_date='2021-6-23 14:59:00', block=False) # 固定时间触发
# test.run(task, trigger='cron', hour=14, minute=54, block=False) # cron规则触发