from apscheduler.triggers.interval import IntervalTrigger import datetime import pytz from apscheduler.schedulers.background import BackgroundScheduler from db.models.tables import RequestTaskSetting class TaskScheduler(object): """ 任务调度类,管理控制数据爬取任务执行时机 """ def __init__(self) -> None: self._scheduler = BackgroundScheduler() self._request_job_id = None def add_task_by_config(self, task_func, config: RequestTaskSetting): """根据爬取任务配置信息添加任务""" self._request_job_id = "request_job" # trigger_interval = IntervalTrigger(seconds=config.request_interval_seconds) # self._scheduler.add_job(task_func, trigger=trigger_interval, # start_date=config.request_start_time, end_date=config.request_end_time, id=self._request_job_id) self._scheduler.add_job(task_func, 'interval', seconds=config.request_interval_seconds, start_date=config.request_start_time, end_date=config.request_end_time, id=self._request_job_id, misfire_grace_time=600) def remove_job(self): """移除任务""" if not self._request_job_id == None: self._scheduler.remove_job(self._request_job_id) def modify_task_by_config(self, config: RequestTaskSetting): """根据爬取任务配置信息修改任务""" if self._request_job_id == None: return self._scheduler.modify_job(self._request_job_id, seconds=config.request_interval_seconds, start_date=config.request_start_time, end_date=config.request_end_time) def add_task(self, task_func, start_date, intervel, end_date=None): """添加任务""" # trigger_interval = IntervalTrigger(seconds=intervel) # self._scheduler.add_job(task_func, trigger=trigger_interval, start_date=start_date, end_date=end_date) self._scheduler.add_job(task_func, 'interval', seconds=intervel, start_date=start_date, end_date=end_date) def start_task(self): """开始任务""" if not self._scheduler.running: self._scheduler.start() def shutdown_task(self): """关闭任务""" self._scheduler.shutdown() def has_request_job(self): return not self._request_job_id == None def is_finish(self): """判断当前线程内的所有定时任务是否完成""" finish = True jobs = self._scheduler.get_jobs() now = datetime.datetime.now().replace(tzinfo=pytz.timezone('UTC')) for j in jobs: t = j._get_run_times(now) finish = len(t) == 0 if finish == False: break return finish