|
from apscheduler.triggers.interval import IntervalTrigger
|
from apscheduler.schedulers.background import BlockingScheduler
|
from datetime import datetime
|
|
class MainScheduler(object):
|
"""
|
主线程控制类
|
"""
|
|
def __init__(self) -> None:
|
self._scheduler = BlockingScheduler()
|
|
def add_task(self, task_func):
|
"""添加任务,主线程规定每一分钟轮询一次"""
|
trigger_interval = IntervalTrigger(minutes=1)
|
self._scheduler.add_job(task_func, trigger=trigger_interval, next_run_time=datetime.now(), misfire_grace_time=600)
|
|
def start_task(self):
|
"""开始任务"""
|
self._scheduler.start()
|
|
def shutdown_task(self):
|
"""关闭任务"""
|
self._scheduler.shutdown()
|