from datetime import datetime
|
from db.models.tables import RequestTaskSetting
|
from db.repository.site_info_rep import SiteInfoRepository
|
from db.repository.site_data_rep import SiteDataRepository
|
from db.repository.site_latest_time_rep import SiteLatestTimeRepository
|
from db.repository.request_task_rep import RequestTaskRepository
|
from api import login_api, site_info_api, site_data_api
|
from utils.log_utils import LogUtils
|
from analysis.data_statistics import DataStatistics
|
from enumeration.enum_site_status import EnumSiteStatus
|
|
from .request_task_proxy import retry, time_phased_execution, get_min_time, get_request_duration
|
|
|
class JinShanRequestTask(object):
|
"""金山扬尘监测数据爬取任务"""
|
|
def __init__(self, debug=False) -> None:
|
self.debug = debug
|
self._config = None
|
self._rep_site_info = SiteInfoRepository()
|
self._rep_site_data = SiteDataRepository()
|
self._rep_latest_time = SiteLatestTimeRepository()
|
self._rep_request_task = RequestTaskRepository()
|
self._data_statistics = DataStatistics()
|
self._site_latest_time_map = None
|
|
def set_config(self, config: RequestTaskSetting):
|
self._config = config
|
|
def _update_site_latest_time(self):
|
"""更新站点数据最新时刻表"""
|
l = self._rep_latest_time.query_all()
|
m = {}
|
for item in l:
|
m[item.mn_code] = item.latest_time
|
self._site_latest_time_map = m
|
|
@time_phased_execution
|
def _task_get_data_by_section(self, obj, **kwargs) -> bool:
|
"""
|
获取某个设备或某个场景类型下某个时段的监测数据,并做后续处理
|
Args:
|
obj: {
|
site: 站点编号
|
typeID: 站点类型
|
}
|
**kwargs:{
|
sTime: 开始时间
|
eTime: 结束时间
|
}
|
|
"""
|
LogUtils.info("", prefix=1)
|
|
# 获取监测数据
|
if isinstance(obj, dict):
|
kwargs.update(obj)
|
LogUtils.info(f"分段 - 监测数据获取 | 站点: {obj.get('site')} | 起始: {kwargs['sTime']} | 结束: {kwargs['eTime']}")
|
else:
|
LogUtils.info(f"分段 - 监测数据获取 | 起始: {kwargs['sTime']} | 结束: {kwargs['eTime']}")
|
res = site_data_api.fetch_dust_data(self, self._config.request_retry_times, self._config.request_fail_wait_seconds, "访问监测数据接口", kwargs)
|
|
# 接口报错,返回直接重试
|
if res == False:
|
return False
|
|
# 无数据时,由于循环获取是按时间升序进行的,因此直接跳出
|
# if len(res) == 0:
|
# LogUtils.info(f"|- 站点: {obj.get('site')} 未获取到数据, 获取结束")
|
# return 'break'
|
|
# 数据统计
|
self._data_statistics.count_by_type(res, "TypeName")
|
|
# 数据去重
|
self._update_site_latest_time() # 更新站点数据最新时刻表
|
data_map = self._data_statistics.data_grouping(res, "MNCode")
|
res = self._data_statistics.data_deduplication(data_map, self._site_latest_time_map, "LST")
|
|
# TODO 异常分析
|
|
# 更新站点最新时刻表及站点状态, 将监测数据入库
|
res2 = self._rep_site_data.save_site_info(res)
|
if res2:
|
LogUtils.info("浓度数据写入完成, 站点最新数据时间更新成功! ")
|
return res2
|
|
@retry
|
def task_login(self) -> bool:
|
"""登陆任务"""
|
|
LogUtils.info(f'尝试登陆...')
|
return login_api.login()
|
|
@retry
|
def task_get_site_info(self) -> bool:
|
"""获取监测点基本信息任务"""
|
|
LogUtils.info(f'尝试获取监测点基本信息...')
|
res = site_info_api.fetch_site_info()
|
if res != False:
|
return self._rep_site_info.save_site_info(res)
|
else:
|
return False
|
|
@retry
|
def task_get_site_data(self) -> bool:
|
"""
|
获取监测点数据任务
|
"""
|
|
LogUtils.info(f'尝试获取监测点数据...')
|
online_list = self._rep_latest_time.query_by_site_status(EnumSiteStatus.Online.value)
|
stop_list = self._rep_latest_time.query_by_site_status(EnumSiteStatus.Stop.value)
|
|
"""停运设备"""
|
LogUtils.info(f'当前停运设备数{len(stop_list)}')
|
if self.debug:
|
stop_list = []
|
# 停运设备每一台单独爬取数据
|
for item in stop_list:
|
# 获取数据爬取时段
|
st, et = get_request_duration(item.latest_time, EnumSiteStatus.Stop)
|
# 获取设备对应项目编号
|
site_info = self._rep_site_info.query_by_mncode(item.mn_code)
|
if site_info == None:
|
continue
|
# 分时段执行数据爬取, 单个站点数据获取的时长可以拉长,设定正常获取的90倍
|
result = self._task_get_data_by_section(st, et, self._config.request_range_hour * 3 * 30,
|
self._config.request_range_interval_seconds, obj={'site': site_info.id})
|
# 若爬取报错则立即返回
|
if result == False:
|
return False
|
# 刷新设备状态
|
self._rep_latest_time.refresh_site_status()
|
|
"""上线设备"""
|
LogUtils.info(f'当前上线设备数{len(online_list)}')
|
# 获取时间最小项
|
min_site_time = get_min_time(online_list)
|
# 获取数据爬取时段
|
st, et = get_request_duration(min_site_time, EnumSiteStatus.Online)
|
if self.debug:
|
st = datetime(2023, 10, 17, 12, 00, 00)
|
et = datetime(2023, 10, 17, 12, 30, 00)
|
LogUtils.debug(f'调试模式 - 上线设备数据获取时段变更为:{st} - {et}')
|
# 分时段执行数据爬取
|
result = self._task_get_data_by_section(st, et, self._config.request_range_hour, self._config.request_range_interval_seconds)
|
# 若爬取报错则立即返回
|
if result == False:
|
return False
|
# 刷新设备状态
|
self._rep_latest_time.refresh_site_status()
|
|
return True
|
|
def do_request_task(self):
|
"""执行数据爬取任务"""
|
|
# 创建任务
|
task_id = self._rep_request_task.create_task()
|
LogUtils.task_id = task_id
|
task_success = False
|
try:
|
print()
|
print()
|
LogUtils.info("------------- 任务开启 -------------", prefix=1)
|
if not self.task_login(self._config.login_retry_times, self._config.login_fail_wait_seconds, "登录"):
|
return
|
self.task_get_site_info(self._config.request_retry_times, self._config.request_fail_wait_seconds, "获取站点基本信息")
|
task_success = self.task_get_site_data(self._config.request_retry_times, self._config.request_fail_wait_seconds, "获取站点监测数据")
|
except Exception as e:
|
LogUtils.error("任务中断,发生未处理异常:", e)
|
task_success = False
|
finally:
|
LogUtils.info("------------- 任务结束 -------------", prefix=1)
|
# 完成任务,更新任务信息
|
self._rep_request_task.complete_task(task_id, task_success)
|
LogUtils.task_id = None
|
|
# LogUtils.info("执行数据爬取任务")
|