from apscheduler.triggers.interval import IntervalTrigger
|
import datetime
|
import pytz
|
from apscheduler.schedulers.background import BackgroundScheduler
|
from db.models.tables import RequestTaskSetting
|
|
|
class TaskScheduler(object):
|
"""
|
任务调度类,管理控制数据爬取任务执行时机
|
"""
|
|
def __init__(self) -> None:
|
self._scheduler = BackgroundScheduler()
|
self._request_job_id = None
|
|
def add_task_by_config(self, task_func, config: RequestTaskSetting):
|
"""根据爬取任务配置信息添加任务"""
|
self._request_job_id = "request_job"
|
# trigger_interval = IntervalTrigger(seconds=config.request_interval_seconds)
|
# self._scheduler.add_job(task_func, trigger=trigger_interval,
|
# start_date=config.request_start_time, end_date=config.request_end_time, id=self._request_job_id)
|
self._scheduler.add_job(task_func, 'interval', seconds=config.request_interval_seconds,
|
start_date=config.request_start_time, end_date=config.request_end_time, id=self._request_job_id, misfire_grace_time=600)
|
|
def remove_job(self):
|
"""移除任务"""
|
if not self._request_job_id == None:
|
self._scheduler.remove_job(self._request_job_id)
|
|
def modify_task_by_config(self, config: RequestTaskSetting):
|
"""根据爬取任务配置信息修改任务"""
|
if self._request_job_id == None:
|
return
|
self._scheduler.modify_job(self._request_job_id, seconds=config.request_interval_seconds,
|
start_date=config.request_start_time, end_date=config.request_end_time)
|
|
def add_task(self, task_func, start_date, intervel, end_date=None):
|
"""添加任务"""
|
# trigger_interval = IntervalTrigger(seconds=intervel)
|
# self._scheduler.add_job(task_func, trigger=trigger_interval, start_date=start_date, end_date=end_date)
|
self._scheduler.add_job(task_func, 'interval', seconds=intervel, start_date=start_date, end_date=end_date)
|
|
def start_task(self):
|
"""开始任务"""
|
if not self._scheduler.running:
|
self._scheduler.start()
|
|
def shutdown_task(self):
|
"""关闭任务"""
|
self._scheduler.shutdown()
|
|
def has_request_job(self):
|
return not self._request_job_id == None
|
|
def is_finish(self):
|
"""判断当前线程内的所有定时任务是否完成"""
|
finish = True
|
jobs = self._scheduler.get_jobs()
|
now = datetime.datetime.now().replace(tzinfo=pytz.timezone('UTC'))
|
for j in jobs:
|
t = j._get_run_times(now)
|
finish = len(t) == 0
|
if finish == False:
|
break
|
return finish
|