feiyu02
2025-06-04 cc2a28ad6b99795d05cd9c923d8f7da27b4509e3
src/main/kotlin/com/flightfeather/uav/biz/sourcetrace/model/PollutedSummary.kt
@@ -1,16 +1,14 @@
package com.flightfeather.uav.biz.sourcetrace.model
import com.flightfeather.uav.biz.FactorFilter
import com.flightfeather.uav.biz.dataanalysis.BaseExceptionAnalysis
import com.flightfeather.uav.biz.sourcetrace.RealTimeAnalysisConfig
import com.flightfeather.uav.biz.sourcetrace.config.RTExcWindLevelConfig
import com.flightfeather.uav.biz.sourcetrace.exceptiontype.*
import com.flightfeather.uav.common.utils.GsonUtils
import com.flightfeather.uav.domain.entity.BaseRealTimeData
import com.flightfeather.uav.domain.repository.SceneInfoRep
import com.flightfeather.uav.socket.eunm.FactorType
import com.flightfeather.uav.socket.sender.UnderwayWebSocketSender
import java.util.*
import com.flightfeather.uav.domain.entity.SceneInfo
import java.time.LocalDateTime
import java.util.Timer
import java.util.TimerTask
// 异常数据生成回调类
typealias NewPolluteSummaryCallback = (ex: PollutedSummary.AnalysisResult) -> Unit
/**
 * 污染情况汇总
@@ -18,7 +16,7 @@
 * @date 2025/5/27
 * @author feiyu02
 */
class PollutedSummary {
class PollutedSummary(private val config: RTExcWindLevelConfig, private val callback: NewPolluteSummaryCallback) {
    /**
@@ -26,75 +24,159 @@
     * 每一刻钟对历史线索进行统计,提出会商建议(离污染源较远、污染源数量、出现次数)、走航路线调整建议(离污染源较近、走航轨迹未接近溯源场景)
     */
    constructor(sceneInfoRep: SceneInfoRep, factorFilter: FactorFilter?) {
        this.sceneInfoRep = sceneInfoRep
        this.config = if (factorFilter != null)
            RTExcWindLevelConfig(factorFilter)
        else
            RTExcWindLevelConfig(
                FactorFilter.builder()
//                .withMain(FactorType.NO2)
                    .withMain(FactorType.CO)
//                .withMain(FactorType.H2S)
//                .withMain(FactorType.SO2)
//                .withMain(FactorType.O3)
                    .withMain(FactorType.PM25)
                    .withMain(FactorType.PM10)
                    .withMain(FactorType.VOC)
                    .create()
            )
        initTask()
    /**
     * 分析结果
     */
    inner class AnalysisResult{
        // 按照被扫描次数降序排列的污染源列表
        var sortedSceneList: List<Pair<SceneInfo?, Int>>? = null
    }
    constructor(sceneInfoRep: SceneInfoRep) : this(sceneInfoRep, null)
    /**
     * 实时统计
     */
    inner class AnalysisStatistic {
        // 按照被扫描次数降序排列的污染源列表
        var sortedSceneList: List<Pair<SceneInfo?, Int>>? = null
    }
    // 污染线索
    var clueList = mutableListOf<PollutedClue>()
    // 最新实时走航监测数据
    val realTimeDataList = mutableListOf<BaseRealTimeData>()
    private val sceneInfoRep: SceneInfoRep
    // 未分析的污染线索
    val clueList = mutableListOf<PollutedClue>()
    private val config: RTExcWindLevelConfig
    // 已分析的污染线索
    private val historyClueList = mutableListOf<PollutedClue>()
    private val taskList = mutableListOf<BaseExceptionAnalysis<RTExcWindLevelConfig, PollutedClue>>()
    // 定时污染分析任务控制
    private var analysisTimer: Timer? = null
    fun initTask() {
        taskList.clear()
        taskList.apply {
            add(RTExcWindLevel1(config) { exceptionCallback(it) }.also { it.init() })
            add(RTExcWindLevel1_1(config) { exceptionCallback(it) }.also { it.init() })
            add(RTExcWindLevel4(config) { exceptionCallback(it) }.also { it.init() })
            add(RTExcWindLevel6(config) { exceptionCallback(it) }.also { it.init() })
    // 定时污染分析任务
    private var lastAnalysisOnTimeTask:TimerTask? = null
    // 定时污染分析任务运行状态
    private var analysisTaskIsRunning = false
    // 上一次定时污染分析任务结束时间
    private lateinit var lastAnalysisTime: LocalDateTime
    init {
        clear()
    }
    // 新增一条污染线索
    fun addClue(pollutedClue: PollutedClue) {
        clueList.add(pollutedClue)
//        realTimeSummary()
        analysisOnClueCount()
    }
    // 刷新当前最新的走航监测数据
    fun refreshLatestMonitorData(data: BaseRealTimeData) {
//        realTimeDataList.clear()
        realTimeDataList.add(data)
    }
    fun clear() {
        realTimeDataList.clear()
        clueList.clear()
        historyClueList.clear()
        analysisTimer?.cancel()
        analysisTimer = null
        analysisTaskIsRunning = false
        lastAnalysisTime = LocalDateTime.now()
        resetAnalysisOnTime()
    }
    /**
     * 重置定时分析线索任务
     */
    private fun resetAnalysisOnTime() {
        // 取消原有的分析任务计时
        analysisTimer?.cancel()
        lastAnalysisOnTimeTask?.cancel()
        // 以当前时间为起点,重新开始新的一轮等待计时
        analysisTimer = Timer()
        val period = config.analysisPeriod * 60 * 1000L
        lastAnalysisOnTimeTask = newAnalysisTask()
        analysisTimer?.schedule(lastAnalysisOnTimeTask, period, period)
    }
    /**
     * 在定时污染线索分析任务等待周期时间内,若污染线索量超过设定值,直接触发分析线索任务
     * 并重置定时分析任务
     */
    private fun analysisOnClueCount() {
        if (clueList.size >= config.analysisCount && !analysisTaskIsRunning) {
            newAnalysisTask().run()
            resetAnalysisOnTime()
        }
    }
    /**
     * 计算新的一条实时走航数据
     * 实时线索统计
     */
    fun addOneData(data: BaseRealTimeData) {
        // 计算异常
        taskList.forEach { it.onNextData(data) }
        // 限定时间内没有新数据传入,则结束当前的计算
    private fun realTimeSummary() {
        val statistic = AnalysisStatistic()
        // 共有多少相关污染源,哪些污染源被扫描次数较多
        val sceneMap = mutableMapOf<String?, Pair<SceneInfo?, Int>>()
        clueList.forEach {c->
            c.pollutedSource?.sceneList?.forEach { s->
                if (!sceneMap.containsKey(s?.guid)) {
                    sceneMap[s?.guid] = s to 1
                } else {
                    sceneMap[s?.guid] = s to (sceneMap[s?.guid]?.second!! + 1)
                }
            }
        }
        val res = sceneMap.entries.sortedBy { it.value.second }
        statistic.sortedSceneList = res.map { it.value }
    }
    /**
     * 超时处理,较长时间没有新数据进入,进行初始化操作
     * 线索分析
     */
    private fun dealOnTimeout() {
        val timer = Timer(true)
        timer.schedule(object : TimerTask() {
            override fun run() {
                TODO("Not yet implemented")
    private fun analysis() {
        val result = AnalysisResult()
        // 共有多少相关污染源,哪些污染源被扫描次数较多
        val sceneMap = mutableMapOf<String?, Pair<SceneInfo?, Int>>()
        clueList.forEach {c->
            c.pollutedSource?.sceneList?.forEach { s->
                if (!sceneMap.containsKey(s?.guid)) {
                    sceneMap[s?.guid] = s to 1
                } else {
                    sceneMap[s?.guid] = s to (sceneMap[s?.guid]?.second!! + 1)
                }
            }
        }, 60 * 1000)
        timer.cancel()
        }
        val res = sceneMap.entries.sortedBy { it.value.second }
        result.sortedSceneList = res.map { it.value }
        // 当前的走航数据的定位和污染源距离是否是逐渐接近,若走航远离了主要污染源,提示用户调整走航路线
        // 线索分析完成后,移动至历史线索列表
        historyClueList.addAll(clueList)
        clueList.clear()
        realTimeDataList.clear()
        callback(result)
//        TODO()
    }
    // 数据突变异常回调
    private fun exceptionCallback(ex: PollutedClue) {
        // 溯源污染源信息
        ex.searchScenes(sceneInfoRep)
        clueList
        // 广播污染溯源异常结果
        UnderwayWebSocketSender.broadcast(GsonUtils.gson.toJson(ex))
    // 定时污染分析任务
    private fun newAnalysisTask(): TimerTask {
        return object : TimerTask() {
            override fun run() {
                // 记录任务运行状态
                analysisTaskIsRunning = true
                analysis()
                // 记录上一次的任务结束时间
                lastAnalysisTime = LocalDateTime.now()
                analysisTaskIsRunning = false
            }
        }
    }
}