riku
2024-01-10 a9e8e27e0503552b7b2a99c821da732175d4f071
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
from datetime import datetime
from db.models.tables import RequestTaskSetting
from db.repository.site_info_rep import SiteInfoRepository
from db.repository.site_data_rep import SiteDataRepository
from db.repository.site_latest_time_rep import SiteLatestTimeRepository
from db.repository.request_task_rep import RequestTaskRepository
from api import login_api, site_info_api, site_data_api
from utils.log_utils import LogUtils
from analysis.data_statistics import DataStatistics
from enumeration.enum_site_status import EnumSiteStatus
 
from .request_task_proxy import retry, time_phased_execution, get_min_time, get_request_duration
 
 
class JinShanRequestTask(object):
    """金山扬尘监测数据爬取任务"""
 
    def __init__(self, debug=False) -> None:
        self.debug = debug
        self._config = None
        self._rep_site_info = SiteInfoRepository()
        self._rep_site_data = SiteDataRepository()
        self._rep_latest_time = SiteLatestTimeRepository()
        self._rep_request_task = RequestTaskRepository()
        self._data_statistics = DataStatistics()
        self._site_latest_time_map = None
 
    def set_config(self, config: RequestTaskSetting):
        self._config = config
 
    def _update_site_latest_time(self):
        """更新站点数据最新时刻表"""
        l = self._rep_latest_time.query_all()
        m = {}
        for item in l:
            m[item.mn_code] = item.latest_time
        self._site_latest_time_map = m
 
    @time_phased_execution
    def _task_get_data_by_section(self, obj, **kwargs) -> bool:
        """
        获取某个设备或某个场景类型下某个时段的监测数据,并做后续处理
        Args:
            obj: {
                site: 站点编号
                typeID: 站点类型
            }
            **kwargs:{
                sTime: 开始时间
                eTime: 结束时间
            }
 
        """
        LogUtils.info("", prefix=1)
 
        # 获取监测数据
        if isinstance(obj, dict):
            kwargs.update(obj)
            LogUtils.info(f"分段 - 监测数据获取 | 站点: {obj.get('site')} | 起始: {kwargs['sTime']} | 结束: {kwargs['eTime']}")
        else:
            LogUtils.info(f"分段 - 监测数据获取 | 起始: {kwargs['sTime']} | 结束: {kwargs['eTime']}")
        res = site_data_api.fetch_dust_data(self, self._config.request_retry_times, self._config.request_fail_wait_seconds, "访问监测数据接口", kwargs)
 
        # 接口报错,返回直接重试
        if res == False:
            return False
 
        # 无数据时,由于循环获取是按时间升序进行的,因此直接跳出
        # if len(res) == 0:
        #     LogUtils.info(f"|- 站点: {obj.get('site')} 未获取到数据, 获取结束")
        #     return 'break'
 
        # 数据统计
        self._data_statistics.count_by_type(res, "TypeName")
 
        # 数据去重
        self._update_site_latest_time()  # 更新站点数据最新时刻表
        data_map = self._data_statistics.data_grouping(res, "MNCode")
        res = self._data_statistics.data_deduplication(data_map, self._site_latest_time_map, "LST")
 
        # TODO 异常分析
 
        # 更新站点最新时刻表及站点状态, 将监测数据入库
        res2 = self._rep_site_data.save_site_info(res)
        if res2:
            LogUtils.info("浓度数据写入完成, 站点最新数据时间更新成功! ")
        return res2
 
    @retry
    def task_login(self) -> bool:
        """登陆任务"""
 
        LogUtils.info(f'尝试登陆...')
        return login_api.login()
 
    @retry
    def task_get_site_info(self) -> bool:
        """获取监测点基本信息任务"""
 
        LogUtils.info(f'尝试获取监测点基本信息...')
        res = site_info_api.fetch_site_info()
        if res != False:
            return self._rep_site_info.save_site_info(res)
        else:
            return False
 
    @retry
    def task_get_site_data(self) -> bool:
        """
        获取监测点数据任务
        """
 
        LogUtils.info(f'尝试获取监测点数据...')
        online_list = self._rep_latest_time.query_by_site_status(EnumSiteStatus.Online.value)
        stop_list = self._rep_latest_time.query_by_site_status(EnumSiteStatus.Stop.value)
 
        """停运设备"""
        LogUtils.info(f'当前停运设备数{len(stop_list)}')
        if self.debug:
            stop_list = []
        # 停运设备每一台单独爬取数据
        for item in stop_list:
            # 获取数据爬取时段
            st, et = get_request_duration(item.latest_time, EnumSiteStatus.Stop)
            # 获取设备对应项目编号
            site_info = self._rep_site_info.query_by_mncode(item.mn_code)
            if site_info == None:
                continue
            # 分时段执行数据爬取, 单个站点数据获取的时长可以拉长,设定正常获取的90倍
            result = self._task_get_data_by_section(st, et, self._config.request_range_hour * 3 * 30,
                                                    self._config.request_range_interval_seconds, obj={'site': site_info.id})
            # 若爬取报错则立即返回
            if result == False:
                return False
        # 刷新设备状态
        self._rep_latest_time.refresh_site_status()
 
        """上线设备"""
        LogUtils.info(f'当前上线设备数{len(online_list)}')
        # 获取时间最小项
        min_site_time = get_min_time(online_list)
        # 获取数据爬取时段
        st, et = get_request_duration(min_site_time, EnumSiteStatus.Online)
        if self.debug:
            st = datetime(2023, 10, 17, 12, 00, 00)
            et = datetime(2023, 10, 17, 12, 30, 00)
            LogUtils.debug(f'调试模式 - 上线设备数据获取时段变更为:{st} - {et}')
        # 分时段执行数据爬取
        result = self._task_get_data_by_section(st, et, self._config.request_range_hour, self._config.request_range_interval_seconds)
        # 若爬取报错则立即返回
        if result == False:
            return False
        # 刷新设备状态
        self._rep_latest_time.refresh_site_status()
 
        return True
 
    def do_request_task(self):
        """执行数据爬取任务"""
 
        # 创建任务
        task_id = self._rep_request_task.create_task()
        LogUtils.task_id = task_id
        task_success = False
        try:
            print()
            print()
            LogUtils.info("------------- 任务开启 -------------", prefix=1)
            if not self.task_login(self._config.login_retry_times, self._config.login_fail_wait_seconds, "登录"):
                return
            self.task_get_site_info(self._config.request_retry_times, self._config.request_fail_wait_seconds, "获取站点基本信息")
            task_success = self.task_get_site_data(self._config.request_retry_times, self._config.request_fail_wait_seconds, "获取站点监测数据")
        except Exception as e:
            LogUtils.error("任务中断,发生未处理异常:", e)
            task_success = False
        finally:
            LogUtils.info("------------- 任务结束 -------------", prefix=1)
            # 完成任务,更新任务信息
            self._rep_request_task.complete_task(task_id, task_success)
            LogUtils.task_id = None
 
        # LogUtils.info("执行数据爬取任务")