feiyu02
2025-08-05 176d7d8283e66ccf63878c9ab823e900df94b748
src/main/kotlin/com/flightfeather/uav/biz/sourcetrace/RealTimeExceptionAnalysisController.kt
@@ -2,70 +2,188 @@
import com.flightfeather.uav.biz.FactorFilter
import com.flightfeather.uav.biz.dataanalysis.BaseExceptionAnalysis
import com.flightfeather.uav.biz.sourcetrace.exceptiontype.RealTimeExceptionSlideAverage
import com.flightfeather.uav.biz.sourcetrace.exceptiontype.RealTimeExceptionValueMutation
import com.flightfeather.uav.biz.sourcetrace.model.RealTimeExceptionResult
import com.flightfeather.uav.common.api2word.utils.JsonUtils
import com.flightfeather.uav.common.location.LocationRoadNearby
import com.flightfeather.uav.common.utils.GsonUtils
import com.flightfeather.uav.common.utils.MapUtil
import com.flightfeather.uav.domain.entity.BaseRealTimeData
import com.flightfeather.uav.domain.entity.SceneInfo
import com.flightfeather.uav.domain.entity.avg
import com.flightfeather.uav.domain.repository.RealTimeDataRep
import com.flightfeather.uav.domain.repository.SegmentInfoRep
import com.flightfeather.uav.domain.repository.SceneInfoRep
import com.flightfeather.uav.socket.eunm.FactorType
import com.flightfeather.uav.socket.handler.UnderwayWebSocketServerHandler
import com.google.gson.Gson
import com.flightfeather.uav.socket.sender.UnderwayWebSocketSender
import java.util.Timer
import java.util.TimerTask
import kotlin.math.PI
/**
 * 实时走航污染溯源
 * @date 2025/5/8
 * @author feiyu02
 */
class RealTimeExceptionAnalysisController (
    private val realTimeDataRep: RealTimeDataRep,
    private val locationRoadNearby: LocationRoadNearby,
    private val segmentInfoRep: SegmentInfoRep,
    private val underwayWebSocketServerHandler: UnderwayWebSocketServerHandler,
    factorFilter: FactorFilter
){
    private var config:RealTimeAnalysisConfig = RealTimeAnalysisConfig(factorFilter)
@Deprecated("2025.5.29, 逻辑与业务不匹配,后续删除")
class RealTimeExceptionAnalysisController {
    constructor(sceneInfoRep: SceneInfoRep, factorFilter: FactorFilter?) {
        this.sceneInfoRep = sceneInfoRep
        this.config = if (factorFilter != null)
            RealTimeAnalysisConfig(factorFilter)
        else
            RealTimeAnalysisConfig(
                FactorFilter.builder()
//                .withMain(FactorType.NO2)
                    .withMain(FactorType.CO)
//                .withMain(FactorType.H2S)
//                .withMain(FactorType.SO2)
//                .withMain(FactorType.O3)
                    .withMain(FactorType.PM25)
                    .withMain(FactorType.PM10)
                    .withMain(FactorType.VOC)
                    .create()
            )
        initTask()
    }
    constructor(sceneInfoRep: SceneInfoRep) : this(sceneInfoRep, null)
    private val sceneInfoRep: SceneInfoRep
    private val config: RealTimeAnalysisConfig
    private val taskList = mutableListOf<BaseExceptionAnalysis<RealTimeAnalysisConfig, RealTimeExceptionResult>>()
    private fun initTask(config: RealTimeAnalysisConfig) {
    fun initTask() {
        taskList.clear()
        taskList.apply {
            add(RealTimeExceptionValueMutation(config){ exceptionCallback(it)})
            add(
                RealTimeExceptionValueMutation(config) { exceptionCallback(it) }.also { it.init() }
            )
            add(
                RealTimeExceptionSlideAverage(config){ exceptionCallback(it)}.also { it.init() }
            )
        }
    }
    init {
        initTask(config)
    }
    // 计算历史任务
    /**
     * 计算新的一条实时走航数据
     */
    fun addOneData(data: BaseRealTimeData) {
        taskList
        // 计算异常
        taskList.forEach { it.onNextData(data) }
        // 限定时间内没有新数据传入,则结束当前的计算
    }
    /**
     * 超时处理,有两种超时情况
     * 1. 较短时间内,主动结束连续当前异常判断
     * 2. 较长时间内,进行初始化操作
     */
    private fun dealOnTimeout() {
        val timer = Timer(true)
        timer.schedule(object : TimerTask() {
            override fun run() {
                TODO("Not yet implemented")
            }
        }, 60 * 1000)
        timer.cancel()
    }
    // 数据突变异常回调
    private fun exceptionCallback(ex: RealTimeExceptionResult) {
        if (sourceTrace(ex, config)) {
            underwayWebSocketServerHandler.broadcast(GsonUtils.gson.toJson(ex))
        }
        // 溯源污染源信息
        sourceTrace(ex, config)
        // 广播污染溯源异常结果
        UnderwayWebSocketSender.broadcast(GsonUtils.gson.toJson(ex))
    }
    private fun sourceTrace(ex: RealTimeExceptionResult, config: RealTimeAnalysisConfig):Boolean {
    /**
     * 污染反向溯源
     */
    private fun sourceTrace(ex: RealTimeExceptionResult, config: RealTimeAnalysisConfig) {
        // 计算异常数据均值
        val avgData = ex.dataList.avg()
        if (avgData.windSpeed!! > config.sourceTraceWindSpeedLimit) {
            return false
            return
        }
        // 取中间点作为反向溯源的起点
        val midData = ex.dataList[ex.dataList.size / 2]
//        avgData.longitude
//        avgData.latitude
//        avgData.windDirection
        return false
        // 计算反向溯源区域
        val polygon = calSector(
            avgData.windSpeed!!.toDouble(),
            avgData.windDirection!!.toDouble(),
            midData.longitude!!.toDouble() to midData.latitude!!.toDouble(),
            config.sourceTraceTimeLimit,
            config.sourceTraceDegOffset
        )
        // 按照区域检索内部污染源信息
        // 1. 首先按照四至范围从数据库初步筛选污染源
        val fb = MapUtil.calFourBoundaries(polygon)
        val sceneList = sceneInfoRep.findByCoordinateRange(fb)
        // 2. 再精确判断是否在反向溯源区域多边形内部
        val result = mutableListOf<SceneInfo>()
        sceneList.forEach {
            // Fixme 2025.5.14: 污染源的坐标是高德地图坐标系(火星坐标系),而走航数据是WGS84坐标系
            val point = MapUtil.gcj02ToWgs84(it!!.longitude.toDouble() to it.latitude.toDouble())
            if (MapUtil.isPointInPolygon(point, polygon)) {
                result.add(it)
            }
        }
        // 更新中间点信息
        ex.midData = avgData.apply {
            dataTime = midData.dataTime
            createTime = midData.createTime
            longitude = midData.longitude
            latitude = midData.latitude
        }.toDataVo()
        // 更新溯源范围内的污染场景信息
        ex.relatedSceneList = result
    }
    /**
     * 根据中心点坐标、风向和风速,以及给定的夹角,计算以中心点按照风向风速和时长,向外扩散形成的扇形的点坐标
     * @param windSpeed 风速,单位:米/秒
     * @param windDir 风向,单位:度
     * @param center 中心点坐标经纬度
     * @param durationMin 时长,单位:分钟
     * @param defaultDegOffset 扩散偏移角度
     * @return 多边形顶点坐标集合
     */
    private fun calSector(
        windSpeed: Double, windDir: Double, center: Pair<Double, Double>, durationMin: Int,
        defaultDegOffset: Double = 30.0,
    ): List<Pair<Double, Double>> {
        val sDeg = windDir - defaultDegOffset
        val eDeg = windDir + defaultDegOffset
        val distance = windSpeed * durationMin * 60
        val result = mutableListOf(center)
        var startDeg = sDeg
        while (startDeg < eDeg) {
            val p = MapUtil.getPointByLen(center, distance, startDeg * PI / 180)
            result.add(p)
            startDeg++
        }
        return result
//        // 左侧(逆时针侧)顶点
//        val p1 = MapUtil.getPointByLen(center, distance, sDeg * PI / 180)
//        // 风向反向顶点
//        val p2 = MapUtil.getPointByLen(center, distance, windDir * PI / 180)
//        // 右侧(顺时针侧)顶点
//        val p3 = MapUtil.getPointByLen(center, distance, eDeg * PI / 180)
//
//        return listOf(center, p1, p2, p3)
    }
}