package com.flightfeather.uav.biz.sourcetrace
|
|
import com.flightfeather.uav.biz.FactorFilter
|
import com.flightfeather.uav.biz.dataanalysis.BaseExceptionAnalysis
|
import com.flightfeather.uav.biz.sourcetrace.config.RTExcWindLevelConfig
|
import com.flightfeather.uav.biz.sourcetrace.exceptiontype.*
|
import com.flightfeather.uav.biz.sourcetrace.model.AnalysisResult
|
import com.flightfeather.uav.biz.sourcetrace.model.PollutedClue
|
import com.flightfeather.uav.biz.sourcetrace.model.PollutedSummary
|
import com.flightfeather.uav.domain.entity.BaseRealTimeData
|
import com.flightfeather.uav.domain.repository.SceneInfoRep
|
import com.flightfeather.uav.domain.repository.SourceTraceRep
|
import com.flightfeather.uav.socket.eunm.FactorType
|
import com.flightfeather.uav.socket.sender.MsgType
|
import com.flightfeather.uav.socket.sender.UnderwayWebSocketSender
|
import java.util.*
|
|
/**
|
* 污染线索控制器
|
* @date 2025/5/27
|
* @author feiyu02
|
*/
|
class SourceTraceController {
|
|
|
/**
|
* 5. 污染源的被扫描次数
|
* 每一刻钟对历史线索进行统计,提出会商建议(离污染源较远、污染源数量、出现次数)、走航路线调整建议(离污染源较近、走航轨迹未接近溯源场景)
|
*/
|
|
constructor(
|
sceneInfoRep: SceneInfoRep,
|
sourceTraceRep: SourceTraceRep,
|
factorFilter: FactorFilter?,
|
isSearchAddress: Boolean,
|
) {
|
this.sceneInfoRep = sceneInfoRep
|
this.sourceTraceRep = sourceTraceRep
|
this.config = if (factorFilter != null) {
|
RTExcWindLevelConfig(factorFilter)
|
} else {
|
RTExcWindLevelConfig(
|
FactorFilter.builder()
|
.withMain(FactorType.NO2)
|
.withMain(FactorType.CO)
|
// .withMain(FactorType.H2S)
|
// .withMain(FactorType.SO2)
|
// .withMain(FactorType.O3)
|
.withMain(FactorType.PM25)
|
.withMain(FactorType.PM10)
|
// .withMain(FactorType.VOC)
|
.withMain(FactorType.NO)
|
.withCombination(
|
listOf(
|
listOf(FactorType.PM25, FactorType.PM10),
|
// listOf(FactorType.VOC, FactorType.CO),
|
listOf(FactorType.NO, FactorType.NO2),
|
)
|
)
|
.create()
|
)
|
}
|
this.config.isSearchAddress = isSearchAddress
|
|
pollutedSummary = PollutedSummary(config) { summaryCallback(it) }
|
newTask()
|
}
|
|
constructor(sceneInfoRep: SceneInfoRep, sourceTraceRep: SourceTraceRep, isSearchAddress: Boolean = true)
|
: this(sceneInfoRep, sourceTraceRep, null, isSearchAddress)
|
|
private val pollutedSummary: PollutedSummary
|
private val sceneInfoRep: SceneInfoRep
|
private val sourceTraceRep: SourceTraceRep
|
private val config: RTExcWindLevelConfig
|
private val timer = Timer()
|
private var timerTask: TimerTask? = null
|
|
private val taskList = mutableListOf<BaseExceptionAnalysis<RTExcWindLevelConfig, PollutedClue>>()
|
|
fun initTask() {
|
taskList.clear()
|
newTask()
|
pollutedSummary.clear()
|
}
|
|
private fun newTask() {
|
taskList.apply {
|
// add(RTExcSlideAverage(config) { dataChangeCallback(it) }.also { it.init() })
|
add(RTExcWindLevel1(config) { exceptionCallback(it) }.also { it.init() })
|
add(RTExcWindLevel1_1(config) { exceptionCallback(it) }.also { it.init() })
|
add(RTExcWindLevel4(config) { exceptionCallback(it) }.also { it.init() })
|
add(RTExcWindLevel6(config) { exceptionCallback(it) }.also { it.init() })
|
|
add(RTExcChangeRate1(config) { exceptionCallback(it) }.also { it.init() })
|
add(RTExcChangeRate4(config) { exceptionCallback(it) }.also { it.init() })
|
add(RTExcChangeRate6(config) { exceptionCallback(it) }.also { it.init() })
|
|
add(RTWarnChangeRate(config) { dataChangeCallback(it) }.also { it.init() })
|
add(RTWarnChangeRate2(config) { dataChangeCallback(it) }.also { it.init() })
|
}
|
}
|
|
/**
|
* 计算新的一条实时走航数据
|
*/
|
fun addOneData(data: BaseRealTimeData) {
|
// println("====================>")
|
// 计算异常
|
taskList.forEach { it.onNextData(data) }
|
pollutedSummary.refreshLatestMonitorData(data)
|
// 限定时间内没有新数据传入,则结束当前的计算
|
dealOnTimeout()
|
}
|
|
fun addDataList(dataList: List<BaseRealTimeData>) {
|
// 计算异常
|
dataList.forEach { data ->
|
taskList.forEach { it.onNextData(data) }
|
pollutedSummary.refreshLatestMonitorData(data)
|
}
|
// 限定时间内没有新数据传入,则结束当前的计算
|
dealOnTimeout()
|
}
|
|
/**
|
* 超时处理,较长时间没有新数据进入,进行初始化操作
|
*/
|
private fun dealOnTimeout() {
|
// val timer = Timer()
|
timerTask?.cancel()
|
timer.purge()
|
timerTask = object : TimerTask() {
|
override fun run() {
|
initTask()
|
}
|
}
|
timer.schedule(timerTask, 2 * 60 * 60 * 1000)
|
}
|
|
// 数据突变异常回调
|
private fun exceptionCallback(ex: PollutedClue) {
|
// 溯源污染源信息
|
ex.searchScenes(sceneInfoRep)
|
ex.msgType = MsgType.PolClue.value
|
|
// 广播污染溯源异常结果
|
UnderwayWebSocketSender.broadcast(MsgType.PolClue.value, ex)
|
sourceTraceRep.insert(MsgType.PolClue, ex)
|
|
// 记录污染线索
|
pollutedSummary.addClue(ex)
|
}
|
|
// 数据变化提醒回调
|
private fun dataChangeCallback(ex: PollutedClue) {
|
// 溯源污染源信息
|
ex.searchScenes(sceneInfoRep)
|
ex.msgType = MsgType.DataChange.value
|
|
// 广播数据变化提醒
|
UnderwayWebSocketSender.broadcast(MsgType.DataChange.value, ex)
|
sourceTraceRep.insert(MsgType.DataChange, ex)
|
}
|
|
private fun summaryCallback(ex: AnalysisResult) {
|
ex.msgType = MsgType.AnaResult.value
|
// 广播污染溯源异常结果
|
UnderwayWebSocketSender.broadcast(MsgType.AnaResult.value, ex)
|
sourceTraceRep.insert(ex)
|
}
|
}
|