Pythongolang任务调度框架架 APScheduler 为何不执行

APScheduler是Python的一个定时任务框架,用于执行周期或者定时任务,可以基于日期、时间间隔,及类似于云服务器Linux系统上的定时任务crontab类型的定时任务;该框架不仅可以添加、删除定时任务,还可以将任务存储到数据库中,实现任务的持久化,使用起来非常方便。安装方式:pip install apschedulerpscheduler组件及简单说明:1、triggers(触发器):触发器包含调度逻辑,每一个作业有它自己的触发器2、job stores(作业存储):用来存储被调度的作业,默认的作业存储器是简单地把作业任务保存在内存中,支持存储到MongoDB,Redis数据库中3、executors(执行器):执行器用来执行定时任务,只是将需要执行的任务放在新的线程或者线程池中运行4、schedulers(调度器):调度器是将其它部分联系在一起,对使用者提供接口,进行任务添加,设置,删除。实例1 -间隔性任务from datetime import datetimeimport osfrom apscheduler.schedulers.blocking import BlockingSchedulerdef tick():print('Tick! The time is: %s' % datetime.now())if __name__ == '__main__':scheduler = BlockingScheduler()scheduler.add_job(tick, 'interval', seconds=3)print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C '))try:scheduler.start()except (KeyboardInterrupt, SystemExit):pass导入调度器模块 BlockingScheduler,这是最简单的调度器,调用 start 方阻塞当前进程,如果你的程序只用于调度,除了调度进程外没有其他后台进程,那么请用 BlockingScheduler 非常有用,此时调度进程相当于守护进程。定义一个函数 tick 代表我们要调度的作业程序。实例化一个 BlockingScheduler 类,不带参数表明使用默认的作业存储器-内存,默认的执行器是线程池执行器,最大并发线程数默认为 10 个(另一个是进程池执行器)。第 11 行添加一个作业 tick,触发器为 interval,每隔 3 秒执行一次,另外的触发器为 date,cron。date 按特定时间点触发,cron 则按固定的时间间隔触发。加入捕捉用户中断执行和解释器退出异常,pass 关键字,表示什么也不做。测试结果如下:实例2 -cron任务from datetime import datetimeimport osfrom apscheduler.schedulers.blocking import BlockingSchedulerdef tick():print('Tick! The time is: %s' % datetime.now())if __name__ == '__main__':scheduler = BlockingScheduler()scheduler.add_job(tick, 'cron', hour=19,minute=23)print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C '))try:scheduler.start()except (KeyboardInterrupt, SystemExit):passhour =19 , minute =23hour ='19', minute ='23'minute = '*/3' 表示每 5 分钟执行一次hour ='19-21', minute= '23' 表示 19:23、 20:23、 21:23 各执行一次任务配置调度器调度器的主要循环其实就是反复检查是不是到时需要执行的任务。询问自己的每一个作业存储器,有没有到期需要执行的任务,如果有,计算这些作业中每个作业需要运行的时间点,如果时间点有多个,做 coalesce 检查。提交给执行器按时间点运行。在配置调度器前,我们首先要选取适合我们应用环境场景的调度器,存储器和执行器。下面是各调度器的适用场景:BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用start函数会阻塞当前线程,不能立即返回。BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用start后主线程不会阻塞。AsyncIOScheduler:适用于使用了asyncio模块的应用程序。GeventScheduler:适用于使用gevent模块的应用程序。TwistedScheduler:适用于构建Twisted的应用程序。QtScheduler:适用于构建Qt的应用程序。上述调度器可以满足我们绝大多数的应用环境,本文以两种调度器为例说明如何进行调度器配置。接口云服务磁盘性能自动化测试定时任务讲解from apscheduler.schedulers.background import BackgroundSchedulerfrom django_apscheduler.jobstores import DjangoJobStore, register_events, register_jobscheduler = BackgroundScheduler()scheduler.add_jobstore(DjangoJobStore(), "default")scheduler.start()磁盘测试执行内容def RunDisk(taskuuid, taskTF=False):res = PDCreate(taskuuid)ret = res["ret"]if ret == 2:reportuuid = res["data"]["reportuuid"]task_mes = models.Performance_Disk.objects.all()task_mes.filter(taskuuid=taskuuid).update(reportuuid=reportuuid)if taskTF == True:disk_mes = models.Performance_Disk.objects.filter(taskuuid=taskuuid)disk_mes.update(TestStatus="执行中")return res添加定时任务def diskTask(taskuuid, day, hour):T1 = GetNewTime()# 月 天 时 分 秒new_time = T1.new_times(month=0, day=day, hours=0, minutes=0, seconds=0)end_date = str(new_time)disk_mes = models.Performance_Disk.objects.filter(taskuuid=taskuuid)disk_mes.update(duration=day, timing=hour, etime=end_date)scheduler.add_job(func=RunDisk, args=[taskuuid, True], trigger='cron',day_of_week='0-6', id=taskuuid, misfire_grace_time=30, max_instances=10,hour=hour, end_date=end_date)
最近一个程序要用到后台定时任务,看了看python后台任务,一般2个选择,一个是apscheduler,一个celery。先安装一下吧,最新版本的apscheduler是3.6.0版安装pip install apscheduler简单任务from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def aps_test():
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),'欢迎来到我的世界')
scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test,trigger='cron',second='*/10')
scheduler.start()定义一个函数,然后定义一个scheduler类型,添加一个job,然后执行,就可以了 10秒整倍数,就执行这个函数,是不是超级超级简单? 带参数的from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def aps_test(x):
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test,args=('欢迎来到我的世界',),trigger='cron',second='*/10')
scheduler.start()四个模块apscheduler分为4个模块,分别是Triggers,Job stores,Executors,Schedulers.从上面的例子我们就可以看出来了,triggers就是触发器,上面的代码中,用了cron,其实还有其他触发器,看看它的源码解释。The ``trigger`` argument can either be:
#. the alias name of the trigger (e.g. ``date``, ``interval`` or ``cron``), in which case any extra keyword
arguments to this method are passed on to the trigger's constructor
#. an instance of a trigger class看见没有,源码中解释说,有date, interval, cron可供选择,其实看字面意思也可以知道,date表示具体的一次性任务,interval表示循环任务,cron表示定时任务,好了,分别写个代码看看效果最明显。定时任务、一次性任务、循环任务from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def aps_test(x):
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test,args=('定时任务',),trigger='cron',second='*/10')
scheduler.add_job(func=aps_test, args=('一次性任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12))
scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3)
scheduler.start()其实应该不用我解释代码,大家也可以看出结果了,非常清晰。除了一次性任务,trigger是不要写的,直接定义next_run_time就可以了 日志from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename='log1.txt',
filemode='a')
def aps_test(x):
print(1/0)
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test,args=('定时任务',),trigger='cron',second='*/5')
scheduler._logger = logging
scheduler.start()删除任务from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename='log1.txt',
filemode='a')
def aps_test(x):
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
def aps_remove(x):
scheduler.remove_job('interval_task')
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),x)
scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test,args=('定时任务',),trigger='cron',second='*/10',id='cron_task')
scheduler.add_job(func=aps_remove, args=('一次性任务,删除循环任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12),id='remove_task')
scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3,id='interval_task')
scheduler._logger = logging
scheduler.start()在运行过程中,成功删除某一个任务,其实就是为每个任务定义一个id,然后remove_job这个id,是不是超级简单,直观?那还有什么呢? 停止任务、恢复任务from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename='log1.txt',
filemode='a')
def aps_test(x):
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
def aps_remove(x):
scheduler.remove_job('interval_task')
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
def aps_pause(x):
scheduler.pause_job('interval_task')
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
def aps_resume(x):
scheduler.resume_job('interval_task')
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test,args=('定时任务',),trigger='cron',second='*/10',id='cron_task')
scheduler.add_job(func=aps_pause, args=('一次性任务,暂停循环任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12),id='pause_task')
scheduler.add_job(func=aps_resume, args=('一次性任务,恢复循环任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=48),id='resume_task')
scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3,id='interval_task')
scheduler._logger = logging
scheduler.start()是不是很容易?好了,删除任务,停止任务,恢复任务就介绍到这,下面我们看看监听任务。 监听意外from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import datetime
import logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename='log1.txt',
filemode='a')
def aps_test(x):
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
def aps_remove(x):
scheduler.remove_job('interval_task')
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
def aps_pause(x):
scheduler.pause_job('interval_task')
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
def aps_resume(x):
print(1/0) # 自定义错误
scheduler.resume_job('interval_task')
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
def my_listerner(event):
if event.exception:
print('任务出错了!')
else:
print('任务正常运行中...')
scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test,args=('定时任务',),trigger='cron',second='*/10',id='cron_task')
scheduler.add_job(func=aps_pause, args=('一次性任务,暂停循环任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12),id='pause_task')
scheduler.add_job(func=aps_resume, args=('一次性任务,恢复循环任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=48),id='resume_task')
scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3,id='interval_task')
scheduler.add_listener(my_listerner,EVENT_JOB_ERROR
EVENT_JOB_EXECUTED)
scheduler._logger = logging
scheduler.start()但是最后在任务出错的时候参数信息并没有打印输出,不知道是啥原因呢?

我要回帖

更多关于 golang任务调度框架 的文章

 

随机推荐