from apscheduler.triggers.interval import IntervalTrigger from apscheduler.schedulers.background import BlockingScheduler from datetime import datetime class MainScheduler(object): """ 主线程控制类 """ def __init__(self) -> None: self._scheduler = BlockingScheduler() def add_task(self, task_func): """添加任务,主线程规定每一分钟轮询一次""" trigger_interval = IntervalTrigger(minutes=1) self._scheduler.add_job(task_func, trigger=trigger_interval, next_run_time=datetime.now(), misfire_grace_time=600) def start_task(self): """开始任务""" self._scheduler.start() def shutdown_task(self): """关闭任务""" self._scheduler.shutdown()