import time from datetime import datetime, timedelta from utils.log_utils import LogUtils from utils.date_utils import DateUtils from enumeration.enum_site_status import EnumSiteStatus from config.config import Config # 数据获取代理 def retry(func): """失败重试逻辑""" def inner(self, retry_times, wait_time, tag, obj=None): sucess = False count = 0 while count < retry_times and sucess == False: if obj == None: sucess = func(self) else: sucess = func(self, obj) if sucess == False: count = count + 1 LogUtils.error(f'{tag}失败') if count < retry_times: LogUtils.error(f'程序会在{wait_time}秒后再次尝试, 剩余尝试次数{retry_times - count}') time.sleep(wait_time) else: LogUtils.info(f'{tag}成功') break return sucess return inner def time_phased_execution(func): """分时段循环执行逻辑""" def inner(self, start_time: datetime, end_time: datetime, range: int, wait_time: int, obj=None): """ Args: start_time: 开始时间 end_time: 结束时间 range: 时间分段间隔,单位小时 wait_time: 每段循环执行间隔,单位秒 obj: 自定义任意类型附带参数 """ # 根据给定参数,获取分割后的时段列表 time_list = DateUtils.time_slice(start_time, end_time, range) print() LogUtils.info("-----------------------------------------------------------------------------------------------------------------------------", prefix=2) LogUtils.info(f"分时段任务开始 | 起始: {start_time} | 结束: {end_time} | 周期: {range}小时 | 等待: {wait_time}秒 | 次数: {len(time_list)}") for item in time_list: time.sleep(wait_time) r = func(self, obj, sTime=item[0], eTime=item[1]) if r == 'break': LogUtils.info("-----------------------------------------------------------------------------------------------------------------------------", prefix=2) return 'break' if r == False: LogUtils.info("-----------------------------------------------------------------------------------------------------------------------------", prefix=2) return False LogUtils.info("-----------------------------------------------------------------------------------------------------------------------------", prefix=2) return inner def get_min_time(site_time_list) -> datetime: """获取站点数据最新时刻列表中的时间最小值""" if len(site_time_list) == 0: # 最新时刻表为空时,默认按照配置参数返回开始时间 offset_days = Config().request_config.first_request_offset_days t = DateUtils.today_0am() return t - timedelta(days=offset_days) else: l = sorted(site_time_list, key=lambda x: x.latest_time) return l[0].latest_time def get_request_duration(start_time: datetime, site_status: EnumSiteStatus): """根据起始时间和设备状态, 计算数据爬取的时段范围""" now = DateUtils.today_0am() if site_status == EnumSiteStatus.Online: return start_time, now elif site_status == EnumSiteStatus.Stop: st = start_time diff_days = DateUtils.time_distance(start_time, now) if diff_days > 30: st = now - timedelta(days=30) return st, now