from datetime import datetime from db.models.tables import RequestTaskSetting from db.repository.site_info_rep import SiteInfoRepository from db.repository.site_data_rep import SiteDataRepository from db.repository.site_latest_time_rep import SiteLatestTimeRepository from db.repository.request_task_rep import RequestTaskRepository from api import login_api, site_info_api, site_data_api from utils.log_utils import LogUtils from analysis.data_statistics import DataStatistics from enumeration.enum_site_status import EnumSiteStatus from .request_task_proxy import retry, time_phased_execution, get_min_time, get_request_duration class JinShanRequestTask(object): """金山扬尘监测数据爬取任务""" def __init__(self, debug=False) -> None: self.debug = debug self._config = None self._rep_site_info = SiteInfoRepository() self._rep_site_data = SiteDataRepository() self._rep_latest_time = SiteLatestTimeRepository() self._rep_request_task = RequestTaskRepository() self._data_statistics = DataStatistics() self._site_latest_time_map = None def set_config(self, config: RequestTaskSetting): self._config = config def _update_site_latest_time(self): """更新站点数据最新时刻表""" l = self._rep_latest_time.query_all() m = {} for item in l: m[item.mn_code] = item.latest_time self._site_latest_time_map = m @time_phased_execution def _task_get_data_by_section(self, obj, **kwargs) -> bool: """ 获取某个设备或某个场景类型下某个时段的监测数据,并做后续处理 Args: obj: { site: 站点编号 typeID: 站点类型 } **kwargs:{ sTime: 开始时间 eTime: 结束时间 } """ LogUtils.info("", prefix=1) # 获取监测数据 if isinstance(obj, dict): kwargs.update(obj) LogUtils.info(f"分段 - 监测数据获取 | 站点: {obj.get('site')} | 起始: {kwargs['sTime']} | 结束: {kwargs['eTime']}") else: LogUtils.info(f"分段 - 监测数据获取 | 起始: {kwargs['sTime']} | 结束: {kwargs['eTime']}") res = site_data_api.fetch_dust_data(self, self._config.request_retry_times, self._config.request_fail_wait_seconds, "访问监测数据接口", kwargs) # 接口报错,返回直接重试 if res == False: return False # 无数据时,由于循环获取是按时间升序进行的,因此直接跳出 # if len(res) == 0: # LogUtils.info(f"|- 站点: {obj.get('site')} 未获取到数据, 获取结束") # return 'break' # 数据统计 self._data_statistics.count_by_type(res, "TypeName") # 数据去重 self._update_site_latest_time() # 更新站点数据最新时刻表 data_map = self._data_statistics.data_grouping(res, "MNCode") res = self._data_statistics.data_deduplication(data_map, self._site_latest_time_map, "LST") # TODO 异常分析 # 更新站点最新时刻表及站点状态, 将监测数据入库 res2 = self._rep_site_data.save_site_info(res) if res2: LogUtils.info("浓度数据写入完成, 站点最新数据时间更新成功! ") return res2 @retry def task_login(self) -> bool: """登陆任务""" LogUtils.info(f'尝试登陆...') return login_api.login() @retry def task_get_site_info(self) -> bool: """获取监测点基本信息任务""" LogUtils.info(f'尝试获取监测点基本信息...') res = site_info_api.fetch_site_info() if res != False: return self._rep_site_info.save_site_info(res) else: return False @retry def task_get_site_data(self) -> bool: """ 获取监测点数据任务 """ LogUtils.info(f'尝试获取监测点数据...') online_list = self._rep_latest_time.query_by_site_status(EnumSiteStatus.Online.value) stop_list = self._rep_latest_time.query_by_site_status(EnumSiteStatus.Stop.value) """停运设备""" LogUtils.info(f'当前停运设备数{len(stop_list)}') if self.debug: stop_list = [] # 停运设备每一台单独爬取数据 for item in stop_list: # 获取数据爬取时段 st, et = get_request_duration(item.latest_time, EnumSiteStatus.Stop) # 获取设备对应项目编号 site_info = self._rep_site_info.query_by_mncode(item.mn_code) if site_info == None: continue # 分时段执行数据爬取, 单个站点数据获取的时长可以拉长,设定正常获取的90倍 result = self._task_get_data_by_section(st, et, self._config.request_range_hour * 3 * 30, self._config.request_range_interval_seconds, obj={'site': site_info.id}) # 若爬取报错则立即返回 if result == False: return False # 刷新设备状态 self._rep_latest_time.refresh_site_status() """上线设备""" LogUtils.info(f'当前上线设备数{len(online_list)}') # 获取时间最小项 min_site_time = get_min_time(online_list) # 获取数据爬取时段 st, et = get_request_duration(min_site_time, EnumSiteStatus.Online) if self.debug: st = datetime(2023, 10, 17, 12, 00, 00) et = datetime(2023, 10, 17, 12, 30, 00) LogUtils.debug(f'调试模式 - 上线设备数据获取时段变更为:{st} - {et}') # 分时段执行数据爬取 result = self._task_get_data_by_section(st, et, self._config.request_range_hour, self._config.request_range_interval_seconds) # 若爬取报错则立即返回 if result == False: return False # 刷新设备状态 self._rep_latest_time.refresh_site_status() return True def do_request_task(self): """执行数据爬取任务""" # 创建任务 task_id = self._rep_request_task.create_task() LogUtils.task_id = task_id task_success = False try: print() print() LogUtils.info("------------- 任务开启 -------------", prefix=1) if not self.task_login(self._config.login_retry_times, self._config.login_fail_wait_seconds, "登录"): return self.task_get_site_info(self._config.request_retry_times, self._config.request_fail_wait_seconds, "获取站点基本信息") task_success = self.task_get_site_data(self._config.request_retry_times, self._config.request_fail_wait_seconds, "获取站点监测数据") except Exception as e: LogUtils.error("任务中断,发生未处理异常:", e) task_success = False finally: LogUtils.info("------------- 任务结束 -------------", prefix=1) # 完成任务,更新任务信息 self._rep_request_task.complete_task(task_id, task_success) LogUtils.task_id = None # LogUtils.info("执行数据爬取任务")