| | |
| | | package com.flightfeather.uav.biz.sourcetrace.model |
| | | |
| | | import com.flightfeather.uav.biz.FactorFilter |
| | | import com.flightfeather.uav.biz.dataanalysis.BaseExceptionAnalysis |
| | | import com.flightfeather.uav.biz.sourcetrace.RealTimeAnalysisConfig |
| | | import com.flightfeather.uav.biz.sourcetrace.config.RTExcWindLevelConfig |
| | | import com.flightfeather.uav.biz.sourcetrace.exceptiontype.* |
| | | import com.flightfeather.uav.common.utils.GsonUtils |
| | | import com.flightfeather.uav.domain.entity.BaseRealTimeData |
| | | import com.flightfeather.uav.domain.repository.SceneInfoRep |
| | | import com.flightfeather.uav.socket.eunm.FactorType |
| | | import com.flightfeather.uav.socket.sender.UnderwayWebSocketSender |
| | | import java.util.* |
| | | import com.flightfeather.uav.domain.entity.SceneInfo |
| | | import java.time.LocalDateTime |
| | | import java.util.Timer |
| | | import java.util.TimerTask |
| | | |
| | | // 异常数据生成回调类 |
| | | typealias NewPolluteSummaryCallback = (ex: PollutedSummary.AnalysisResult) -> Unit |
| | | |
| | | /** |
| | | * 污染情况汇总 |
| | |
| | | * @date 2025/5/27 |
| | | * @author feiyu02 |
| | | */ |
| | | class PollutedSummary { |
| | | class PollutedSummary(private val config: RTExcWindLevelConfig, private val callback: NewPolluteSummaryCallback) { |
| | | |
| | | |
| | | /** |
| | |
| | | * 每一刻钟对历史线索进行统计,提出会商建议(离污染源较远、污染源数量、出现次数)、走航路线调整建议(离污染源较近、走航轨迹未接近溯源场景) |
| | | */ |
| | | |
| | | constructor(sceneInfoRep: SceneInfoRep, factorFilter: FactorFilter?) { |
| | | this.sceneInfoRep = sceneInfoRep |
| | | this.config = if (factorFilter != null) |
| | | RTExcWindLevelConfig(factorFilter) |
| | | else |
| | | RTExcWindLevelConfig( |
| | | FactorFilter.builder() |
| | | // .withMain(FactorType.NO2) |
| | | .withMain(FactorType.CO) |
| | | // .withMain(FactorType.H2S) |
| | | // .withMain(FactorType.SO2) |
| | | // .withMain(FactorType.O3) |
| | | .withMain(FactorType.PM25) |
| | | .withMain(FactorType.PM10) |
| | | .withMain(FactorType.VOC) |
| | | .create() |
| | | ) |
| | | initTask() |
| | | } |
| | | |
| | | constructor(sceneInfoRep: SceneInfoRep) : this(sceneInfoRep, null) |
| | | |
| | | // 污染线索 |
| | | var clueList = mutableListOf<PollutedClue>() |
| | | |
| | | private val sceneInfoRep: SceneInfoRep |
| | | |
| | | private val config: RTExcWindLevelConfig |
| | | |
| | | private val taskList = mutableListOf<BaseExceptionAnalysis<RTExcWindLevelConfig, PollutedClue>>() |
| | | |
| | | fun initTask() { |
| | | taskList.clear() |
| | | taskList.apply { |
| | | add(RTExcWindLevel1(config) { exceptionCallback(it) }.also { it.init() }) |
| | | add(RTExcWindLevel1_1(config) { exceptionCallback(it) }.also { it.init() }) |
| | | add(RTExcWindLevel4(config) { exceptionCallback(it) }.also { it.init() }) |
| | | add(RTExcWindLevel6(config) { exceptionCallback(it) }.also { it.init() }) |
| | | } |
| | | /** |
| | | * 分析结果 |
| | | */ |
| | | inner class AnalysisResult{ |
| | | // 按照被扫描次数降序排列的污染源列表 |
| | | var sortedSceneList: List<Pair<SceneInfo?, Int>>? = null |
| | | } |
| | | |
| | | /** |
| | | * 计算新的一条实时走航数据 |
| | | * 实时统计 |
| | | */ |
| | | fun addOneData(data: BaseRealTimeData) { |
| | | // 计算异常 |
| | | taskList.forEach { it.onNextData(data) } |
| | | // 限定时间内没有新数据传入,则结束当前的计算 |
| | | inner class AnalysisStatistic { |
| | | // 按照被扫描次数降序排列的污染源列表 |
| | | var sortedSceneList: List<Pair<SceneInfo?, Int>>? = null |
| | | } |
| | | |
| | | /** |
| | | * 超时处理,较长时间没有新数据进入,进行初始化操作 |
| | | */ |
| | | private fun dealOnTimeout() { |
| | | val timer = Timer(true) |
| | | timer.schedule(object : TimerTask() { |
| | | // 最新实时走航监测数据 |
| | | val realTimeDataList = mutableListOf<BaseRealTimeData>() |
| | | |
| | | // 未分析的污染线索 |
| | | val clueList = mutableListOf<PollutedClue>() |
| | | |
| | | // 已分析的污染线索 |
| | | private val historyClueList = mutableListOf<PollutedClue>() |
| | | |
| | | // 定时污染分析任务控制 |
| | | private var analysisTimer: Timer? = null |
| | | |
| | | // 定时污染分析任务 |
| | | private val analysisOnTimeTask = object : TimerTask() { |
| | | override fun run() { |
| | | TODO("Not yet implemented") |
| | | // 记录任务运行状态 |
| | | analysisTaskIsRunning = true |
| | | analysis() |
| | | // 记录上一次的任务结束时间 |
| | | lastAnalysisTime = LocalDateTime.now() |
| | | analysisTaskIsRunning = false |
| | | } |
| | | }, 60 * 1000) |
| | | timer.cancel() |
| | | } |
| | | |
| | | // 数据突变异常回调 |
| | | private fun exceptionCallback(ex: PollutedClue) { |
| | | // 溯源污染源信息 |
| | | ex.searchScenes(sceneInfoRep) |
| | | clueList |
| | | // 广播污染溯源异常结果 |
| | | UnderwayWebSocketSender.broadcast(GsonUtils.gson.toJson(ex)) |
| | | // 定时污染分析任务运行状态 |
| | | private var analysisTaskIsRunning = false |
| | | |
| | | // 上一次定时污染分析任务结束时间 |
| | | private lateinit var lastAnalysisTime: LocalDateTime |
| | | |
| | | init { |
| | | clear() |
| | | } |
| | | |
| | | // 新增一条污染线索 |
| | | fun addClue(pollutedClue: PollutedClue) { |
| | | clueList.add(pollutedClue) |
| | | // realTimeSummary() |
| | | analysisOnClueCount() |
| | | } |
| | | |
| | | // 刷新当前最新的走航监测数据 |
| | | fun refreshLatestMonitorData(data: BaseRealTimeData) { |
| | | // realTimeDataList.clear() |
| | | realTimeDataList.add(data) |
| | | } |
| | | |
| | | fun clear() { |
| | | realTimeDataList.clear() |
| | | clueList.clear() |
| | | historyClueList.clear() |
| | | analysisTimer?.cancel() |
| | | analysisTimer = null |
| | | analysisTaskIsRunning = false |
| | | lastAnalysisTime = LocalDateTime.now() |
| | | resetAnalysisOnTime() |
| | | } |
| | | |
| | | /** |
| | | * 重置定时分析线索任务 |
| | | */ |
| | | private fun resetAnalysisOnTime() { |
| | | // 取消原有的分析任务计时 |
| | | analysisTimer?.cancel() |
| | | // 以当前时间为起点,重新开始新的一轮等待计时 |
| | | analysisTimer = Timer() |
| | | val period = config.analysisPeriod * 60 * 1000L |
| | | analysisTimer?.schedule(analysisOnTimeTask, period, period) |
| | | } |
| | | |
| | | /** |
| | | * 在定时污染线索分析任务等待周期时间内,若污染线索量超过设定值,直接触发分析线索任务 |
| | | * 并重置定时分析任务 |
| | | */ |
| | | private fun analysisOnClueCount() { |
| | | if (clueList.size >= config.analysisCount && !analysisTaskIsRunning) { |
| | | analysisOnTimeTask.run() |
| | | resetAnalysisOnTime() |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 实时线索统计 |
| | | */ |
| | | private fun realTimeSummary() { |
| | | val statistic = AnalysisStatistic() |
| | | // 共有多少相关污染源,哪些污染源被扫描次数较多 |
| | | val sceneMap = mutableMapOf<String?, Pair<SceneInfo?, Int>>() |
| | | clueList.forEach {c-> |
| | | c.pollutedSource?.sceneList?.forEach { s-> |
| | | if (!sceneMap.containsKey(s?.guid)) { |
| | | sceneMap[s?.guid] = s to 1 |
| | | } else { |
| | | sceneMap[s?.guid] = s to (sceneMap[s?.guid]?.second!! + 1) |
| | | } |
| | | } |
| | | } |
| | | val res = sceneMap.entries.sortedBy { it.value.second } |
| | | statistic.sortedSceneList = res.map { it.value } |
| | | } |
| | | |
| | | /** |
| | | * 线索分析 |
| | | */ |
| | | private fun analysis() { |
| | | val result = AnalysisResult() |
| | | // 共有多少相关污染源,哪些污染源被扫描次数较多 |
| | | val sceneMap = mutableMapOf<String?, Pair<SceneInfo?, Int>>() |
| | | clueList.forEach {c-> |
| | | c.pollutedSource?.sceneList?.forEach { s-> |
| | | if (!sceneMap.containsKey(s?.guid)) { |
| | | sceneMap[s?.guid] = s to 1 |
| | | } else { |
| | | sceneMap[s?.guid] = s to (sceneMap[s?.guid]?.second!! + 1) |
| | | } |
| | | } |
| | | } |
| | | val res = sceneMap.entries.sortedBy { it.value.second } |
| | | result.sortedSceneList = res.map { it.value } |
| | | |
| | | // 当前的走航数据的定位和污染源距离是否是逐渐接近,若走航远离了主要污染源,提示用户调整走航路线 |
| | | |
| | | |
| | | // 线索分析完成后,移动至历史线索列表 |
| | | historyClueList.addAll(clueList) |
| | | clueList.clear() |
| | | realTimeDataList.clear() |
| | | |
| | | callback(result) |
| | | // TODO() |
| | | } |
| | | |
| | | } |