feiyu02
2025-09-30 94fee0b511279679b43e210878d3d36e5a14384b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package com.flightfeather.uav.biz.sourcetrace
 
import com.flightfeather.uav.biz.FactorFilter
import com.flightfeather.uav.biz.dataanalysis.BaseExceptionAnalysis
import com.flightfeather.uav.biz.sourcetrace.config.RTExcWindLevelConfig
import com.flightfeather.uav.biz.sourcetrace.exceptiontype.*
import com.flightfeather.uav.biz.sourcetrace.model.AnalysisResult
import com.flightfeather.uav.biz.sourcetrace.model.PollutedClue
import com.flightfeather.uav.biz.sourcetrace.model.PollutedSummary
import com.flightfeather.uav.domain.entity.BaseRealTimeData
import com.flightfeather.uav.domain.repository.SceneInfoRep
import com.flightfeather.uav.domain.repository.SourceTraceRep
import com.flightfeather.uav.socket.eunm.FactorType
import com.flightfeather.uav.socket.sender.MsgType
import com.flightfeather.uav.socket.sender.UnderwayWebSocketSender
import java.util.*
 
/**
 * 污染线索控制器
 * @date 2025/5/27
 * @author feiyu02
 */
class SourceTraceController {
 
 
    /**
     * 5. 污染源的被扫描次数
     * 每一刻钟对历史线索进行统计,提出会商建议(离污染源较远、污染源数量、出现次数)、走航路线调整建议(离污染源较近、走航轨迹未接近溯源场景)
     */
 
    constructor(
        sceneInfoRep: SceneInfoRep,
        sourceTraceRep: SourceTraceRep,
        factorFilter: FactorFilter?,
        isSearchAddress: Boolean,
    ) {
        this.sceneInfoRep = sceneInfoRep
        this.sourceTraceRep = sourceTraceRep
        this.config = if (factorFilter != null) {
            RTExcWindLevelConfig(factorFilter)
        } else {
            RTExcWindLevelConfig(
                FactorFilter.builder()
                    .withMain(FactorType.NO2)
                    .withMain(FactorType.CO)
//                    .withMain(FactorType.H2S)
//                    .withMain(FactorType.SO2)
//                    .withMain(FactorType.O3)
                    .withMain(FactorType.PM25)
                    .withMain(FactorType.PM10)
//                    .withMain(FactorType.VOC)
                    .withMain(FactorType.NO)
                    .withCombination(
                        listOf(
                            listOf(FactorType.PM25, FactorType.PM10),
//                            listOf(FactorType.VOC, FactorType.CO),
                            listOf(FactorType.NO, FactorType.NO2),
                        )
                    )
                    .create()
            )
        }
        this.config.isSearchAddress = isSearchAddress
 
        pollutedSummary = PollutedSummary(config) { summaryCallback(it) }
        newTask()
    }
 
    constructor(sceneInfoRep: SceneInfoRep, sourceTraceRep: SourceTraceRep, isSearchAddress: Boolean = true)
            : this(sceneInfoRep, sourceTraceRep, null, isSearchAddress)
 
    private val pollutedSummary: PollutedSummary
    private val sceneInfoRep: SceneInfoRep
    private val sourceTraceRep: SourceTraceRep
    private val config: RTExcWindLevelConfig
    private val timer = Timer()
    private var timerTask: TimerTask? = null
 
    private val taskList = mutableListOf<BaseExceptionAnalysis<RTExcWindLevelConfig, PollutedClue>>()
 
    fun initTask() {
        taskList.clear()
        newTask()
        pollutedSummary.clear()
    }
 
    private fun newTask() {
        taskList.apply {
//            add(RTExcSlideAverage(config) { dataChangeCallback(it) }.also { it.init() })
            add(RTExcWindLevel1(config) { exceptionCallback(it) }.also { it.init() })
            add(RTExcWindLevel1_1(config) { exceptionCallback(it) }.also { it.init() })
            add(RTExcWindLevel4(config) { exceptionCallback(it) }.also { it.init() })
            add(RTExcWindLevel6(config) { exceptionCallback(it) }.also { it.init() })
 
            add(RTExcChangeRate1(config) { exceptionCallback(it) }.also { it.init() })
            add(RTExcChangeRate4(config) { exceptionCallback(it) }.also { it.init() })
            add(RTExcChangeRate6(config) { exceptionCallback(it) }.also { it.init() })
 
            add(RTWarnChangeRate(config) { dataChangeCallback(it) }.also { it.init() })
            add(RTWarnChangeRate2(config) { dataChangeCallback(it) }.also { it.init() })
        }
    }
 
    /**
     * 计算新的一条实时走航数据
     */
    fun addOneData(data: BaseRealTimeData) {
//        println("====================>")
        // 计算异常
        taskList.forEach { it.onNextData(data) }
        pollutedSummary.refreshLatestMonitorData(data)
        // 限定时间内没有新数据传入,则结束当前的计算
        dealOnTimeout()
    }
 
    fun addDataList(dataList: List<BaseRealTimeData>) {
        // 计算异常
        dataList.forEach { data ->
            taskList.forEach { it.onNextData(data) }
            pollutedSummary.refreshLatestMonitorData(data)
        }
        // 限定时间内没有新数据传入,则结束当前的计算
        dealOnTimeout()
    }
 
    /**
     * 超时处理,较长时间没有新数据进入,进行初始化操作
     */
    private fun dealOnTimeout() {
//        val timer = Timer()
        timerTask?.cancel()
        timer.purge()
        timerTask = object : TimerTask() {
            override fun run() {
                initTask()
            }
        }
        timer.schedule(timerTask, 2 * 60 * 60 * 1000)
    }
 
    // 数据突变异常回调
    private fun exceptionCallback(ex: PollutedClue) {
        // 溯源污染源信息
        ex.searchScenes(sceneInfoRep)
        ex.msgType = MsgType.PolClue.value
 
        // 广播污染溯源异常结果
        UnderwayWebSocketSender.broadcast(MsgType.PolClue.value, ex)
        sourceTraceRep.insert(MsgType.PolClue, ex)
 
        // 记录污染线索
        pollutedSummary.addClue(ex)
    }
 
    // 数据变化提醒回调
    private fun dataChangeCallback(ex: PollutedClue) {
        // 溯源污染源信息
        ex.searchScenes(sceneInfoRep)
        ex.msgType = MsgType.DataChange.value
 
        // 广播数据变化提醒
        UnderwayWebSocketSender.broadcast(MsgType.DataChange.value, ex)
        sourceTraceRep.insert(MsgType.DataChange, ex)
    }
 
    private fun summaryCallback(ex: AnalysisResult) {
        ex.msgType = MsgType.AnaResult.value
        // 广播污染溯源异常结果
        UnderwayWebSocketSender.broadcast(MsgType.AnaResult.value, ex)
        sourceTraceRep.insert(ex)
    }
}