riku
2024-01-10 a9e8e27e0503552b7b2a99c821da732175d4f071
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from apscheduler.triggers.interval import IntervalTrigger
import datetime
import pytz
from apscheduler.schedulers.background import BackgroundScheduler
from db.models.tables import RequestTaskSetting
 
 
class TaskScheduler(object):
    """
    任务调度类,管理控制数据爬取任务执行时机
    """
 
    def __init__(self) -> None:
        self._scheduler = BackgroundScheduler()
        self._request_job_id = None
 
    def add_task_by_config(self, task_func, config: RequestTaskSetting):
        """根据爬取任务配置信息添加任务"""
        self._request_job_id = "request_job"
        # trigger_interval = IntervalTrigger(seconds=config.request_interval_seconds)
        # self._scheduler.add_job(task_func, trigger=trigger_interval,
        #                         start_date=config.request_start_time, end_date=config.request_end_time, id=self._request_job_id)
        self._scheduler.add_job(task_func, 'interval', seconds=config.request_interval_seconds,
                                start_date=config.request_start_time, end_date=config.request_end_time, id=self._request_job_id, misfire_grace_time=600)
        
    def remove_job(self):
        """移除任务"""
        if not self._request_job_id == None:
            self._scheduler.remove_job(self._request_job_id)
 
    def modify_task_by_config(self, config: RequestTaskSetting):
        """根据爬取任务配置信息修改任务"""
        if self._request_job_id == None:
            return
        self._scheduler.modify_job(self._request_job_id, seconds=config.request_interval_seconds,
                                   start_date=config.request_start_time, end_date=config.request_end_time)
 
    def add_task(self, task_func, start_date, intervel, end_date=None):
        """添加任务"""
        # trigger_interval = IntervalTrigger(seconds=intervel)
        # self._scheduler.add_job(task_func, trigger=trigger_interval, start_date=start_date, end_date=end_date)
        self._scheduler.add_job(task_func, 'interval', seconds=intervel, start_date=start_date, end_date=end_date)
 
    def start_task(self):
        """开始任务"""
        if not self._scheduler.running:
            self._scheduler.start()
 
    def shutdown_task(self):
        """关闭任务"""
        self._scheduler.shutdown()
 
    def has_request_job(self):
        return not self._request_job_id == None
 
    def is_finish(self):
        """判断当前线程内的所有定时任务是否完成"""
        finish = True
        jobs = self._scheduler.get_jobs()
        now = datetime.datetime.now().replace(tzinfo=pytz.timezone('UTC'))
        for j in jobs:
            t = j._get_run_times(now)
            finish = len(t) == 0
            if finish == False:
                break
        return finish