星驰编程网

免费编程资源分享平台_编程教程_代码示例_开发技术文章

「Python自学笔记」Python定时任务框架APScheduler(自用源码)

  • 安装
  • 使用示例代码
    • 间隔性任务
  • 自己封装了一个常用功能

安装

  • pip3 install -i https://pypi.doubanio.com/simple apscheduler

使用示例代码

间隔性任务

  • 每隔一定时间执行
#!/usr/bin/env python
# -*- encoding: utf-8 -*-                            
# @Author     : xiao qiang
# @WeChat     : xiaoqiangclub                              
# @Software   : PyCharm      
# @File       : APSchedulerTasks.py
# @Time       : 2021/6/22 21:59
import time
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger


class APSchedulerTasks:
    def intervalTask(self, task_func, block=True, task_id='myIntervalJob', **args):
        """
        每隔一定时间执行的任务
        :param task_func: 任务函数对象,不带括号
        :param block: 默认为True:使用BlockingScheduler()阻塞调度器;False:使用后台运行调度器
        :param args: 不设置就每秒执行一次!IntervalTrigger的参数即间隔运行的时间:weeks=0, days=0, hours=0, minutes=0, seconds=0等
        :param task_id: 任务id
        :return:
        """
        if block:
            scheduler = BlockingScheduler()  # 阻塞性调度器
        else:
            scheduler = BackgroundScheduler()  # 后台运行调度器

        trigger = IntervalTrigger(**args)  # 触发器
        scheduler.add_job(task_func, trigger=trigger, id=task_id)

        # scheduler.add_job(task_func, 'interval', seconds=1, id=task_id)   # 合并写法

        try:
            scheduler.start()
        except Exception as ret:
            print('intervalTask错误:', str(ret))

        print('intervalTask:我没被阻塞!!!')
        while True:
            time.sleep(1)


if __name__ == '__main__':
    from datetime import datetime


    def task():
        print('现在时间:{}'.format(datetime.now()))


    test = APSchedulerTasks()
    test.intervalTask(task, seconds=2, block=False)

  • trigger触发器可以分开写也可以直接合并写,如上示例

自己封装了一个常用功能

  • 下面是我根据个人使用情况封装的一个类
#!/usr/bin/env python
# -*- encoding: utf-8 -*-                            
# @Author     : xiao qiang
# @WeChat     : xiaoqiangclub                              
# @Software   : PyCharm      
# @File       : APSchedulerTasks.py
# @Time       : 2021/6/22 21:59
import time
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.triggers.date import DateTrigger


class APSchedulerTasks:

    def run(self, task_func, trigger='interval', block=True, task_id='myIntervalJob', thread_pool_executor=True,
            pool_count=10, **args):
        """
        每隔一定时间执行的任务
        :param task_func: 任务函数对象,不带括号
        :param trigger: 触发器类型,默认:'interval',还可以是:'date','cron';文档:https://pypi.org/search/?q=triggers
        :param block: 默认为True:使用BlockingScheduler()阻塞调度器;False:使用后台运行调度器
        :param args: 触发器对应的触发参数
        :param task_id: 任务id
        :param thread_pool_executor: 设置执行器,默认为ThreadPoolExecutor(),设置为False:ProcessPoolExecutor()
        :param pool_count: 最大线程进程数
        :return:
        """
        if thread_pool_executor:  # 执行器设置
            executors = {
                'default': ThreadPoolExecutor(pool_count)
            }
        else:
            executors = {
                'processpool': ProcessPoolExecutor(pool_count)
            }

        if block:
            scheduler = BlockingScheduler(executors=executors)  # 阻塞性调度器
        else:
            scheduler = BackgroundScheduler(executors=executors)  # 后台运行调度器

        scheduler.add_job(task_func, trigger, **args, id=task_id)  # 合并写法

        try:
            scheduler.start()
        except Exception as ret:
            print('intervalTask错误:', str(ret))

        print('intervalTask:我没被阻塞!!!')
        # while True:
            # time.sleep(1)


if __name__ == '__main__':
    from datetime import datetime


    def task():
        print('现在时间:{}'.format(datetime.now()))


    test = APSchedulerTasks()
    test.run(task, seconds=2, block=False)  # 间隔触发
    # test.run(task, trigger='date', run_date='2021-6-23 14:59:00', block=False)  # 固定时间触发
    # test.run(task, trigger='cron', hour=14, minute=54, block=False)  # cron规则触发
控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言