riku
6 天以前 20b8d870efbbb89564b599561fc69202ba41223f
src/main/kotlin/com/flightfeather/uav/socket/processor/UnderwayProcessor.kt
@@ -1,26 +1,24 @@
package com.flightfeather.uav.socket.processor
import com.flightfeather.uav.biz.FactorFilter
import com.flightfeather.uav.biz.sourcetrace.RealTimeExceptionAnalysisController
import com.flightfeather.uav.common.location.LocationRoadNearby
import com.flightfeather.uav.biz.sourcetrace.SourceTraceController
import com.flightfeather.uav.domain.entity.BaseRealTimeData
import com.flightfeather.uav.model.epw.EPWDataPrep
import com.flightfeather.uav.domain.repository.AirDataRep
import com.flightfeather.uav.domain.repository.RealTimeDataRep
import com.flightfeather.uav.domain.repository.SegmentInfoRep
import com.flightfeather.uav.domain.repository.SceneInfoRep
import com.flightfeather.uav.domain.repository.SourceTraceRep
import com.flightfeather.uav.lightshare.bean.DataVo
import com.flightfeather.uav.socket.bean.AirDataPackage
import com.flightfeather.uav.socket.decoder.AirDataDecoder
import com.flightfeather.uav.socket.decoder.DataPackageDecoder
import com.flightfeather.uav.socket.eunm.AirCommandUnit
import com.flightfeather.uav.socket.eunm.FactorType
import com.flightfeather.uav.socket.eunm.UWDeviceType
import com.flightfeather.uav.socket.handler.UnderwayWebSocketServerHandler
import io.netty.channel.ChannelHandlerContext
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.format.annotation.DateTimeFormat
import org.springframework.stereotype.Component
import java.text.SimpleDateFormat
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.util.*
import javax.annotation.PostConstruct
/**
 * 处理socket接收的消息
@@ -31,10 +29,8 @@
@Component
class UnderwayProcessor(
    private val airDataRep: AirDataRep,
    private val realTimeDataRep: RealTimeDataRep,
    private val locationRoadNearby: LocationRoadNearby,
    private val segmentInfoRep: SegmentInfoRep,
    private val underwayWebSocketServerHandler: UnderwayWebSocketServerHandler,
    private val sceneInfoRep: SceneInfoRep,
    private val sourceTraceRep: SourceTraceRep
) : BaseProcessor() {
    companion object {
@@ -48,23 +44,7 @@
    private val dataProcessMap = mutableMapOf<String?, EPWDataPrep>()
    // 实时走航污染溯源处理器
    private val realTimeExceptionAnalysisController =
        RealTimeExceptionAnalysisController(
            realTimeDataRep,
            locationRoadNearby,
            segmentInfoRep,
            underwayWebSocketServerHandler,
            FactorFilter.builder()
//                .withMain(FactorType.NO2)
                .withMain(FactorType.CO)
//                .withMain(FactorType.H2S)
//                .withMain(FactorType.SO2)
//                .withMain(FactorType.O3)
                .withMain(FactorType.PM25)
                .withMain(FactorType.PM10)
                .withMain(FactorType.VOC)
                .create()
        )
    private val sourceTraceMap = mutableMapOf<String?, SourceTraceController>()
    override var tag: String = "走航监测"
@@ -76,11 +56,17 @@
            //保存
            deviceSession.saveDevice(packageData.deviceCode, ctx)
            saveToTxt(msg)
            saveToDataBase(packageData)?.takeIf { it.isNotEmpty() }?.get(0)?.let {
            val res = saveToDataBase(packageData)
            println("----写入完成")
            res?.takeIf { it.isNotEmpty() }?.get(0)?.let {
                // 每台设备有各自单独的异常数据处理器
                if (!sourceTraceMap.containsKey(it.deviceCode)) {
                    sourceTraceMap[it.deviceCode] = SourceTraceController(sceneInfoRep, sourceTraceRep)
                }
                // 将走航数据传入异常处理器
                realTimeExceptionAnalysisController.addOneData(it)
                sourceTraceMap[it.deviceCode]?.addOneData(it)
                println("----动态溯源完成")
            }
        } else {
            println("------${TAG}数据BCC校验失败,舍弃 [${SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Date())}]")
        }
@@ -92,6 +78,9 @@
    fun saveToDataBase(dataPackage: AirDataPackage): List<BaseRealTimeData>? {
        when (dataPackage.commandUnit) {
            AirCommandUnit.AirData.value -> {
                val dataVo = dataPackage.toDataVo()
                // 存储前判断数据是否有效
                if (!isValid(dataVo)) return null
                // 以json格式存储原始数据
                airDataRep.saveAirData(dataPackage)
                // 进行预处理后,存储至对应数据表
@@ -100,7 +89,7 @@
                    dataProcessMap[dataPackage.deviceCode] = EPWDataPrep(UWDeviceType.getType(dataPackage.deviceCode))
                }
                return dataProcessMap[dataPackage.deviceCode]?.run {
                    val list = this.mDataPrep2(dataPackage)// 数据平滑处理
                    val list = this.mDataPrep2(dataVo)// 数据平滑处理
                    airDataRep.savePrepData2(list)// 按照设备类型存储至对应数据表
                }
            }
@@ -165,4 +154,14 @@
        return sb.toString()
    }
    /**
     * 数据有效性判断
     */
    private fun isValid(dataVo: DataVo): Boolean {
        if (dataVo.time == null) return false
        val check1 = LocalDateTime.parse(dataVo.time!!, DateTimeFormatter.ofPattern("yyy-MM-dd HH:mm:ss"))
            .isBefore(LocalDateTime.now())
        return check1
    }
}