| | |
| | | |
| | | import com.flightfeather.uav.biz.FactorFilter |
| | | import com.flightfeather.uav.biz.dataanalysis.BaseExceptionAnalysis |
| | | import com.flightfeather.uav.biz.sourcetrace.RealTimeAnalysisConfig |
| | | import com.flightfeather.uav.biz.sourcetrace.config.RTExcWindLevelConfig |
| | | import com.flightfeather.uav.biz.sourcetrace.exceptiontype.* |
| | | import com.flightfeather.uav.biz.sourcetrace.model.AnalysisResult |
| | | import com.flightfeather.uav.biz.sourcetrace.model.PollutedClue |
| | | import com.flightfeather.uav.biz.sourcetrace.model.PollutedSummary |
| | | import com.flightfeather.uav.common.utils.GsonUtils |
| | | import com.flightfeather.uav.domain.entity.BaseRealTimeData |
| | | import com.flightfeather.uav.domain.repository.SceneInfoRep |
| | | import com.flightfeather.uav.domain.repository.SourceTraceRep |
| | | import com.flightfeather.uav.socket.eunm.FactorType |
| | | import com.flightfeather.uav.socket.sender.MsgType |
| | | import com.flightfeather.uav.socket.sender.UnderwayWebSocketSender |
| | |
| | | * 每一刻钟对历史线索进行统计,提出会商建议(离污染源较远、污染源数量、出现次数)、走航路线调整建议(离污染源较近、走航轨迹未接近溯源场景) |
| | | */ |
| | | |
| | | constructor(sceneInfoRep: SceneInfoRep, factorFilter: FactorFilter?) { |
| | | constructor( |
| | | sceneInfoRep: SceneInfoRep, |
| | | sourceTraceRep: SourceTraceRep, |
| | | factorFilter: FactorFilter?, |
| | | isSearchAddress: Boolean, |
| | | ) { |
| | | this.sceneInfoRep = sceneInfoRep |
| | | this.config = if (factorFilter != null) |
| | | this.sourceTraceRep = sourceTraceRep |
| | | this.config = if (factorFilter != null) { |
| | | RTExcWindLevelConfig(factorFilter) |
| | | else |
| | | } else { |
| | | RTExcWindLevelConfig( |
| | | FactorFilter.builder() |
| | | // .withMain(FactorType.NO2) |
| | | // .withMain(FactorType.CO) |
| | | .withMain(FactorType.NO2) |
| | | .withMain(FactorType.CO) |
| | | // .withMain(FactorType.H2S) |
| | | // .withMain(FactorType.SO2) |
| | | // .withMain(FactorType.O3) |
| | | .withMain(FactorType.PM25) |
| | | .withMain(FactorType.PM10) |
| | | .withMain(FactorType.VOC) |
| | | // .withMain(FactorType.VOC) |
| | | .withMain(FactorType.NO) |
| | | .withCombination( |
| | | listOf( |
| | | listOf(FactorType.PM25, FactorType.PM10), |
| | | // listOf(FactorType.VOC, FactorType.CO), |
| | | listOf(FactorType.NO, FactorType.NO2), |
| | | ) |
| | | ) |
| | | .create() |
| | | ) |
| | | } |
| | | this.config.isSearchAddress = isSearchAddress |
| | | |
| | | pollutedSummary = PollutedSummary(config) { summaryCallback(it) } |
| | | newTask() |
| | | } |
| | | |
| | | constructor(sceneInfoRep: SceneInfoRep) : this(sceneInfoRep, null) |
| | | constructor(sceneInfoRep: SceneInfoRep, sourceTraceRep: SourceTraceRep, isSearchAddress: Boolean = true) |
| | | : this(sceneInfoRep, sourceTraceRep, null, isSearchAddress) |
| | | |
| | | private val pollutedSummary: PollutedSummary |
| | | |
| | | private val sceneInfoRep: SceneInfoRep |
| | | |
| | | private val sourceTraceRep: SourceTraceRep |
| | | private val config: RTExcWindLevelConfig |
| | | private val timer = Timer() |
| | | private var timerTask: TimerTask? = null |
| | | |
| | | private val taskList = mutableListOf<BaseExceptionAnalysis<RTExcWindLevelConfig, PollutedClue>>() |
| | | |
| | |
| | | |
| | | private fun newTask() { |
| | | taskList.apply { |
| | | // add(RTExcSlideAverage(config) { dataChangeCallback(it) }.also { it.init() }) |
| | | add(RTExcWindLevel1(config) { exceptionCallback(it) }.also { it.init() }) |
| | | add(RTExcWindLevel1_1(config) { exceptionCallback(it) }.also { it.init() }) |
| | | add(RTExcWindLevel4(config) { exceptionCallback(it) }.also { it.init() }) |
| | | add(RTExcWindLevel6(config) { exceptionCallback(it) }.also { it.init() }) |
| | | add(RTExcChangeRate(config) { exceptionCallback(it) }.also { it.init() }) |
| | | |
| | | add(RTExcChangeRate1(config) { exceptionCallback(it) }.also { it.init() }) |
| | | add(RTExcChangeRate4(config) { exceptionCallback(it) }.also { it.init() }) |
| | | add(RTExcChangeRate6(config) { exceptionCallback(it) }.also { it.init() }) |
| | | |
| | | add(RTWarnChangeRate(config) { dataChangeCallback(it) }.also { it.init() }) |
| | | add(RTWarnChangeRate2(config) { dataChangeCallback(it) }.also { it.init() }) |
| | | } |
| | | } |
| | | |
| | |
| | | * 计算新的一条实时走航数据 |
| | | */ |
| | | fun addOneData(data: BaseRealTimeData) { |
| | | // println("====================>") |
| | | // 计算异常 |
| | | taskList.forEach { it.onNextData(data) } |
| | | pollutedSummary.refreshLatestMonitorData(data) |
| | | // 限定时间内没有新数据传入,则结束当前的计算 |
| | | dealOnTimeout() |
| | | } |
| | | |
| | | fun addDataList(dataList: List<BaseRealTimeData>) { |
| | | // 计算异常 |
| | | dataList.forEach { data -> |
| | | taskList.forEach { it.onNextData(data) } |
| | | pollutedSummary.refreshLatestMonitorData(data) |
| | | } |
| | | // 限定时间内没有新数据传入,则结束当前的计算 |
| | | dealOnTimeout() |
| | | } |
| | | |
| | | /** |
| | | * 超时处理,较长时间没有新数据进入,进行初始化操作 |
| | | */ |
| | | private fun dealOnTimeout() { |
| | | val timer = Timer(true) |
| | | timer.schedule(object : TimerTask() { |
| | | // val timer = Timer() |
| | | timerTask?.cancel() |
| | | timer.purge() |
| | | timerTask = object : TimerTask() { |
| | | override fun run() { |
| | | TODO("Not yet implemented") |
| | | initTask() |
| | | } |
| | | }, 60 * 1000) |
| | | timer.cancel() |
| | | } |
| | | timer.schedule(timerTask, 2 * 60 * 60 * 1000) |
| | | } |
| | | |
| | | // 数据突变异常回调 |
| | | private fun exceptionCallback(ex: PollutedClue) { |
| | | // 溯源污染源信息 |
| | | ex.searchScenes(sceneInfoRep) |
| | | ex.msgType = MsgType.PolClue.value |
| | | |
| | | // 广播污染溯源异常结果 |
| | | UnderwayWebSocketSender.broadcast(MsgType.PolClue.value, ex) |
| | | sourceTraceRep.insert(MsgType.PolClue, ex) |
| | | |
| | | // 记录污染线索 |
| | | pollutedSummary.addClue(ex) |
| | | } |
| | | |
| | | private fun summaryCallback(ex: PollutedSummary.AnalysisResult) { |
| | | // 数据变化提醒回调 |
| | | private fun dataChangeCallback(ex: PollutedClue) { |
| | | // 溯源污染源信息 |
| | | ex.searchScenes(sceneInfoRep) |
| | | ex.msgType = MsgType.DataChange.value |
| | | |
| | | // 广播数据变化提醒 |
| | | UnderwayWebSocketSender.broadcast(MsgType.DataChange.value, ex) |
| | | sourceTraceRep.insert(MsgType.DataChange, ex) |
| | | } |
| | | |
| | | private fun summaryCallback(ex: AnalysisResult) { |
| | | ex.msgType = MsgType.AnaResult.value |
| | | // 广播污染溯源异常结果 |
| | | UnderwayWebSocketSender.broadcast(MsgType.AnaResult.value, ex) |
| | | sourceTraceRep.insert(ex) |
| | | } |
| | | } |