riku
2024-01-10 a9e8e27e0503552b7b2a99c821da732175d4f071
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import time
from datetime import datetime, timedelta
 
from utils.log_utils import LogUtils
from utils.date_utils import DateUtils
from enumeration.enum_site_status import EnumSiteStatus
from config.config import Config
 
# 数据获取代理
 
 
def retry(func):
    """失败重试逻辑"""
 
    def inner(self, retry_times, wait_time, tag, obj=None):
        sucess = False
        count = 0
        while count < retry_times and sucess == False:
            if obj == None:
                sucess = func(self)
            else:
                sucess = func(self, obj)
 
            if sucess == False:
                count = count + 1
                LogUtils.error(f'{tag}失败')
                if count < retry_times:
                    LogUtils.error(f'程序会在{wait_time}秒后再次尝试, 剩余尝试次数{retry_times - count}')
                    time.sleep(wait_time)
            else:
                LogUtils.info(f'{tag}成功')
                break
        return sucess
    return inner
 
 
def time_phased_execution(func):
    """分时段循环执行逻辑"""
 
    def inner(self, start_time: datetime, end_time: datetime, range: int, wait_time: int, obj=None):
        """
        Args:
            start_time: 开始时间
            end_time: 结束时间
            range: 时间分段间隔,单位小时
            wait_time: 每段循环执行间隔,单位秒
            obj: 自定义任意类型附带参数
        """
        # 根据给定参数,获取分割后的时段列表
        time_list = DateUtils.time_slice(start_time, end_time, range)
        print()
        LogUtils.info("-----------------------------------------------------------------------------------------------------------------------------", prefix=2)
        LogUtils.info(f"分时段任务开始 | 起始: {start_time} | 结束: {end_time} | 周期: {range}小时 | 等待: {wait_time}秒 | 次数: {len(time_list)}")
 
        for item in time_list:
            time.sleep(wait_time)
            r = func(self, obj, sTime=item[0], eTime=item[1])
            if r == 'break':
                LogUtils.info("-----------------------------------------------------------------------------------------------------------------------------", prefix=2)
                return 'break'
            if r == False:
                LogUtils.info("-----------------------------------------------------------------------------------------------------------------------------", prefix=2)
                return False
        LogUtils.info("-----------------------------------------------------------------------------------------------------------------------------", prefix=2)
    return inner
 
 
def get_min_time(site_time_list) -> datetime:
    """获取站点数据最新时刻列表中的时间最小值"""
 
    if len(site_time_list) == 0:
        # 最新时刻表为空时,默认按照配置参数返回开始时间
        offset_days = Config().request_config.first_request_offset_days
        t = DateUtils.today_0am()
        return t - timedelta(days=offset_days)
    else:
        l = sorted(site_time_list, key=lambda x: x.latest_time)
        return l[0].latest_time
 
 
def get_request_duration(start_time: datetime, site_status: EnumSiteStatus):
    """根据起始时间和设备状态, 计算数据爬取的时段范围"""
 
    now = DateUtils.today_0am()
    if site_status == EnumSiteStatus.Online:
        return start_time, now
    elif site_status == EnumSiteStatus.Stop:
        st = start_time
        diff_days = DateUtils.time_distance(start_time, now)
        if diff_days > 30:
            st = now - timedelta(days=30)
        return st, now