import time
|
from datetime import datetime, timedelta
|
|
from utils.log_utils import LogUtils
|
from utils.date_utils import DateUtils
|
from enumeration.enum_site_status import EnumSiteStatus
|
from config.config import Config
|
|
# 数据获取代理
|
|
|
def retry(func):
|
"""失败重试逻辑"""
|
|
def inner(self, retry_times, wait_time, tag, obj=None):
|
sucess = False
|
count = 0
|
while count < retry_times and sucess == False:
|
if obj == None:
|
sucess = func(self)
|
else:
|
sucess = func(self, obj)
|
|
if sucess == False:
|
count = count + 1
|
LogUtils.error(f'{tag}失败')
|
if count < retry_times:
|
LogUtils.error(f'程序会在{wait_time}秒后再次尝试, 剩余尝试次数{retry_times - count}')
|
time.sleep(wait_time)
|
else:
|
LogUtils.info(f'{tag}成功')
|
break
|
return sucess
|
return inner
|
|
|
def time_phased_execution(func):
|
"""分时段循环执行逻辑"""
|
|
def inner(self, start_time: datetime, end_time: datetime, range: int, wait_time: int, obj=None):
|
"""
|
Args:
|
start_time: 开始时间
|
end_time: 结束时间
|
range: 时间分段间隔,单位小时
|
wait_time: 每段循环执行间隔,单位秒
|
obj: 自定义任意类型附带参数
|
"""
|
# 根据给定参数,获取分割后的时段列表
|
time_list = DateUtils.time_slice(start_time, end_time, range)
|
print()
|
LogUtils.info("-----------------------------------------------------------------------------------------------------------------------------", prefix=2)
|
LogUtils.info(f"分时段任务开始 | 起始: {start_time} | 结束: {end_time} | 周期: {range}小时 | 等待: {wait_time}秒 | 次数: {len(time_list)}")
|
|
for item in time_list:
|
time.sleep(wait_time)
|
r = func(self, obj, sTime=item[0], eTime=item[1])
|
if r == 'break':
|
LogUtils.info("-----------------------------------------------------------------------------------------------------------------------------", prefix=2)
|
return 'break'
|
if r == False:
|
LogUtils.info("-----------------------------------------------------------------------------------------------------------------------------", prefix=2)
|
return False
|
LogUtils.info("-----------------------------------------------------------------------------------------------------------------------------", prefix=2)
|
return inner
|
|
|
def get_min_time(site_time_list) -> datetime:
|
"""获取站点数据最新时刻列表中的时间最小值"""
|
|
if len(site_time_list) == 0:
|
# 最新时刻表为空时,默认按照配置参数返回开始时间
|
offset_days = Config().request_config.first_request_offset_days
|
t = DateUtils.today_0am()
|
return t - timedelta(days=offset_days)
|
else:
|
l = sorted(site_time_list, key=lambda x: x.latest_time)
|
return l[0].latest_time
|
|
|
def get_request_duration(start_time: datetime, site_status: EnumSiteStatus):
|
"""根据起始时间和设备状态, 计算数据爬取的时段范围"""
|
|
now = DateUtils.today_0am()
|
if site_status == EnumSiteStatus.Online:
|
return start_time, now
|
elif site_status == EnumSiteStatus.Stop:
|
st = start_time
|
diff_days = DateUtils.time_distance(start_time, now)
|
if diff_days > 30:
|
st = now - timedelta(days=30)
|
return st, now
|