src/main/kotlin/com/flightfeather/uav/biz/datafetch/ShenXinDataFetch.kt
@@ -2,6 +2,7 @@ import com.flightfeather.uav.common.exception.BizException import com.flightfeather.uav.common.net.ShenXinService import com.flightfeather.uav.domain.entity.BaseRealTimeData import com.flightfeather.uav.domain.entity.Mission import com.flightfeather.uav.domain.repository.MissionRep import com.flightfeather.uav.domain.repository.RealTimeDataRep @@ -25,16 +26,17 @@ private val deviceStatusMap = ConcurrentHashMap<String, Boolean>() // 走航任务是否正在获取数据 private val missionStatusMap = ConcurrentHashMap<String, Boolean>() /** * 获取最新的数据 * @param deviceType 设备类型 * @param code 设备编号 */ private fun fetchLatestData(deviceType: UWDeviceType, code: String) { private fun fetchLatestData(deviceType: UWDeviceType, code: String): List<BaseRealTimeData> { if (deviceStatusMap.containsKey(code)) { // 设备正在获取实时数据,则直接返回,放弃本次请求 if (deviceStatusMap[code] == true) { return return emptyList() } // 否则开始获取数据,修改状态为true else { @@ -57,6 +59,7 @@ if (data.isNotEmpty()) { realTimeDataRep.saveData(deviceType, data) } return data } finally { // 设备完成数据获取后,修改状态为false deviceStatusMap[code] = false @@ -66,13 +69,15 @@ /** * 获取给定时间范围内的数据 */ fun fetchLatestData(deviceType: UWDeviceType, code: String, sTime: LocalDateTime?, eTime: LocalDateTime?) { fun fetchLatestData(deviceType: UWDeviceType, code: String, sTime: LocalDateTime?, eTime: LocalDateTime?) : List<BaseRealTimeData> { if (sTime != null && eTime != null) { val data = ShenXinService.fetchData(code, sTime, eTime) realTimeDataRep.deleteData(deviceType, code, sTime, eTime) realTimeDataRep.saveData(deviceType, data) return data } else if (sTime == null && eTime == null) { fetchLatestData(deviceType, code) return fetchLatestData(deviceType, code) } else { throw BizException("开始和结束时间需要都省略或者都填写") } src/main/kotlin/com/flightfeather/uav/biz/sourcetrace/SourceTraceController.kt
@@ -102,6 +102,16 @@ dealOnTimeout() } fun addDataList(dataList: List<BaseRealTimeData>) { // 计算异常 dataList.forEach { data -> taskList.forEach { it.onNextData(data) } pollutedSummary.refreshLatestMonitorData(data) } // 限定时间内没有新数据传入,则结束当前的计算 dealOnTimeout() } /** * 超时处理,较长时间没有新数据进入,进行初始化操作 */ src/main/kotlin/com/flightfeather/uav/biz/sourcetrace/model/PollutedArea.kt
@@ -77,6 +77,7 @@ try { val address = AMapService.reGeo(pair) this.address = address.district + address.township + address.street Thread.sleep(100) } catch (e: Exception) { e.printStackTrace() } src/main/kotlin/com/flightfeather/uav/lightshare/service/ThirdPartyService.kt
@@ -27,4 +27,9 @@ fun fetchLatestData( label: String, type: UWDeviceType, deviceCode: String, startTime: LocalDateTime?, endTime: LocalDateTime?, ): Boolean /** * 对历史走航任务进行快速动态溯源 */ fun sourceTrace(label: String, missionCode: String): Boolean } src/main/kotlin/com/flightfeather/uav/lightshare/service/impl/ThirdPartyServiceImpl.kt
@@ -1,12 +1,18 @@ package com.flightfeather.uav.lightshare.service.impl import com.flightfeather.uav.biz.datafetch.ShenXinDataFetch import com.flightfeather.uav.biz.sourcetrace.SourceTraceController import com.flightfeather.uav.common.exception.BizException import com.flightfeather.uav.domain.repository.MissionRep import com.flightfeather.uav.domain.repository.RealTimeDataRep import com.flightfeather.uav.domain.repository.SceneInfoRep import com.flightfeather.uav.domain.repository.SourceTraceRep import com.flightfeather.uav.lightshare.eunm.ThirdPartyLabel import com.flightfeather.uav.lightshare.service.ThirdPartyService import com.flightfeather.uav.socket.eunm.UWDeviceType import org.springframework.stereotype.Service import java.time.LocalDateTime import java.util.concurrent.ConcurrentHashMap /** * @@ -14,7 +20,17 @@ * @author feiyu02 */ @Service class ThirdPartyServiceImpl(private val shenXinDataFetch: ShenXinDataFetch) : ThirdPartyService { class ThirdPartyServiceImpl( private val shenXinDataFetch: ShenXinDataFetch, private val sceneInfoRep: SceneInfoRep, private val sourceTraceRep: SourceTraceRep, private val missionRep: MissionRep, private val realTimeDataRep: RealTimeDataRep, ) : ThirdPartyService { // 实时走航污染溯源处理器 private val sourceTraceMap = ConcurrentHashMap<String?, SourceTraceController>() private val historySourceTraceTask = ConcurrentHashMap<String, Boolean>() override fun fetchMissionData(label: String, missionCode: String): Boolean { when (label) { @@ -31,10 +47,39 @@ ): Boolean { when (label) { ThirdPartyLabel.ShenXin.value -> { shenXinDataFetch.fetchLatestData(type, deviceCode, startTime, endTime) val data = shenXinDataFetch.fetchLatestData(type, deviceCode, startTime, endTime) getSourceTraceCtrl(deviceCode)?.addDataList(data) return true } else -> throw BizException("第三方接口标识不存在") } } override fun sourceTrace(label: String, missionCode: String): Boolean { when (label) { ThirdPartyLabel.ShenXin.value -> { if (!historySourceTraceTask.containsKey(missionCode)) { historySourceTraceTask[missionCode] = false } if (historySourceTraceTask[missionCode] != true) { historySourceTraceTask[missionCode] = true val stc = SourceTraceController(sceneInfoRep, sourceTraceRep) val mission = missionRep.findOne(missionCode) ?: throw BizException("走航任务不存在") val data = realTimeDataRep.fetchData(mission) stc.addDataList(data) } return true } else -> throw BizException("第三方接口标识不存在") } } private fun getSourceTraceCtrl(key: String): SourceTraceController? { // 每台设备有各自单独的异常数据处理器 if (!sourceTraceMap.containsKey(key)) { sourceTraceMap[key] = SourceTraceController(sceneInfoRep, sourceTraceRep) } // 将走航数据传入异常处理器 return sourceTraceMap[key] } }