package com.flightfeather.uav.socket.processor
|
|
import com.flightfeather.uav.biz.FactorFilter
|
import com.flightfeather.uav.biz.sourcetrace.RealTimeExceptionAnalysisController
|
import com.flightfeather.uav.common.location.LocationRoadNearby
|
import com.flightfeather.uav.domain.entity.BaseRealTimeData
|
import com.flightfeather.uav.model.epw.EPWDataPrep
|
import com.flightfeather.uav.domain.repository.AirDataRep
|
import com.flightfeather.uav.domain.repository.RealTimeDataRep
|
import com.flightfeather.uav.domain.repository.SegmentInfoRep
|
import com.flightfeather.uav.socket.bean.AirDataPackage
|
import com.flightfeather.uav.socket.decoder.AirDataDecoder
|
import com.flightfeather.uav.socket.decoder.DataPackageDecoder
|
import com.flightfeather.uav.socket.eunm.AirCommandUnit
|
import com.flightfeather.uav.socket.eunm.FactorType
|
import com.flightfeather.uav.socket.eunm.UWDeviceType
|
import com.flightfeather.uav.socket.handler.UnderwayWebSocketServerHandler
|
import io.netty.channel.ChannelHandlerContext
|
import org.springframework.beans.factory.annotation.Autowired
|
import org.springframework.stereotype.Component
|
import java.text.SimpleDateFormat
|
import java.util.*
|
import javax.annotation.PostConstruct
|
|
/**
|
* 处理socket接收的消息
|
* Date: 2019.8.27
|
* @author riku
|
*/
|
|
@Component
|
class UnderwayProcessor(
|
private val airDataRep: AirDataRep,
|
private val realTimeDataRep: RealTimeDataRep,
|
private val locationRoadNearby: LocationRoadNearby,
|
private val segmentInfoRep: SegmentInfoRep,
|
private val underwayWebSocketServerHandler: UnderwayWebSocketServerHandler,
|
) : BaseProcessor() {
|
|
companion object {
|
private const val TAG = "UAV"
|
}
|
|
private val airDataDecoder = AirDataDecoder.instance
|
private val dataPackageDecoder = DataPackageDecoder()
|
|
// 数据预处理函数
|
private val dataProcessMap = mutableMapOf<String?, EPWDataPrep>()
|
|
// 实时走航污染溯源处理器
|
private val realTimeExceptionAnalysisController =
|
RealTimeExceptionAnalysisController(
|
realTimeDataRep,
|
locationRoadNearby,
|
segmentInfoRep,
|
underwayWebSocketServerHandler,
|
FactorFilter.builder()
|
// .withMain(FactorType.NO2)
|
.withMain(FactorType.CO)
|
// .withMain(FactorType.H2S)
|
// .withMain(FactorType.SO2)
|
// .withMain(FactorType.O3)
|
.withMain(FactorType.PM25)
|
.withMain(FactorType.PM10)
|
.withMain(FactorType.VOC)
|
.create()
|
)
|
|
override var tag: String = "走航监测"
|
|
override fun dealStringMsg(msg: String, ctx: ChannelHandlerContext?) {
|
//解包
|
val packageData = airDataDecoder.decode(msg)
|
|
if (bccCheck(msg)) {
|
//保存
|
deviceSession.saveDevice(packageData.deviceCode, ctx)
|
saveToTxt(msg)
|
saveToDataBase(packageData)?.takeIf { it.isNotEmpty() }?.get(0)?.let {
|
// 将走航数据传入异常处理器
|
realTimeExceptionAnalysisController.addOneData(it)
|
}
|
|
} else {
|
println("------${TAG}数据BCC校验失败,舍弃 [${SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Date())}]")
|
}
|
}
|
|
/**
|
* 保存至数据库
|
*/
|
fun saveToDataBase(dataPackage: AirDataPackage): List<BaseRealTimeData>? {
|
when (dataPackage.commandUnit) {
|
AirCommandUnit.AirData.value -> {
|
// 以json格式存储原始数据
|
airDataRep.saveAirData(dataPackage)
|
// 进行预处理后,存储至对应数据表
|
if (!dataProcessMap.containsKey(dataPackage.deviceCode)) {
|
// 每台设备有单独的数据预处理对象
|
dataProcessMap[dataPackage.deviceCode] = EPWDataPrep(UWDeviceType.getType(dataPackage.deviceCode))
|
}
|
return dataProcessMap[dataPackage.deviceCode]?.run {
|
val list = this.mDataPrep2(dataPackage)// 数据平滑处理
|
airDataRep.savePrepData2(list)// 按照设备类型存储至对应数据表
|
}
|
}
|
|
else -> return emptyList()
|
}
|
}
|
|
/**
|
* BCC(异或校验)
|
*/
|
fun bccCheck(msg: String): Boolean {
|
val list = mutableListOf<String>().apply {
|
addAll(dataPackageDecoder.toStringList(msg))
|
}
|
//取得数据包中的bcc校验结果
|
val oldBcc = "${list[list.lastIndex - 1]}${list[list.lastIndex]}".toInt(16)
|
|
//去除校验结果
|
list.removeAt(list.lastIndex)
|
list.removeAt(list.lastIndex)
|
|
//计算bcc校验结果
|
var newBcc = 0x00
|
list.forEach {
|
newBcc = newBcc.xor(it.toInt(16))
|
}
|
|
//返回校验结果是否正确
|
return oldBcc == newBcc
|
}
|
|
|
fun encodeToBytes(msg: String): ByteArray {
|
val list = msg.split(" ")
|
val bytes = ByteArray(list.size)
|
for (i in list.indices) {
|
bytes[i] = list[i].toInt(16).toByte()
|
}
|
|
return bytes
|
}
|
|
fun getDataPackage(deviceCode: String?): String? {
|
if (deviceCode == null) return null
|
//23 23 7f 31 37 36 39 31 35 33 31 39 30 39 31 32 30 30 31 31 01 01 00 00 39
|
val sb = StringBuilder("23 23 10 ")
|
deviceCode.forEach {
|
sb.append(it.toInt().toString(16)).append(" ")
|
}
|
sb.append("01 01 00 00 00 0A 41 54 2B 56 45 52 53 49 4F 4E")
|
|
val list = sb.split(" ")
|
|
//计算bcc校验结果
|
var bcc = 0x00
|
list.forEach {
|
bcc = bcc.xor(it.toInt(16))
|
}
|
|
sb.append(" ").append(bcc.toString(16))
|
|
return sb.toString()
|
}
|
}
|