riku
2025-08-28 3bb4fb15c664d29d179083698fdad35a661b1d7f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
package com.flightfeather.uav.biz.dataanalysis
 
import com.flightfeather.uav.biz.FactorFilter
import com.flightfeather.uav.biz.dataanalysis.model.ExceptionTag
import com.flightfeather.uav.biz.sourcetrace.model.RemainException
import com.flightfeather.uav.domain.entity.BaseRealTimeData
import com.flightfeather.uav.lightshare.eunm.ExceptionStatusType
import com.flightfeather.uav.socket.eunm.FactorType
import java.time.Duration
 
/**
 * 连续类型的异常分析基类,适用于当前数据与相邻数据之间有关联关系的情况
 */
abstract class BaseExceptionContinuous<T : ExceptionTag, V : BaseAnalysisConfig, Y : BaseExceptionResult>(
    config: V, private val tagClz: Class<T>,
) : BaseExceptionAnalysis<V, Y>(config) {
 
    enum class JudgeMethod(val des: String) {
        M1("在一定的空间和时间范围内,数据累计出现N次异常后,认为该异常成立"),
        M2("要求数据不间断连续出现N次异常后,认为该异常成立"),
    }
 
    companion object {
        // 记录异常数据段时,分别向起始前和末尾后额外记录的数据个数偏移量
        private const val OFFSET = 10
    }
 
    protected val tagMap = mutableMapOf<FactorType, T>()
 
    // 起始数据与末尾数据间隔
    open var durationCount = 1
 
    // 末尾数据对象
    protected var lastData: BaseRealTimeData? = null
 
    // 最新的一组异常,记录单因子异常
    val latestExceptions = mutableListOf<Pair<FactorFilter.SelectedFactor, T>>()
 
    /**
     * 最新的一组合并异常,根据配置参数从[latestExceptions]单因子异常中,合并异常
     */
    protected val latestCombinedExc = mutableListOf<List<Pair<FactorFilter.SelectedFactor, T>>>()
 
    // 记录需要延迟数据周期进行合并的异常
    val remainingExceptions = mutableListOf<RemainException<T>>()
 
    /**
     * 异常结果
     */
    val result = mutableListOf<Y>()
 
    /**
     * 不适用于此异常类型的监测因子
     */
    open var excludedFactor: List<FactorType> = emptyList()
 
    abstract var judgeMethod: JudgeMethod
 
    /**
     * 立即判断:当出现异常时,缓存异常数据的同时,立即对已有异常进行判断是否满足异常结果要求
     */
    open fun immeExcCheck(tag: T, factorType: FactorType): Boolean {
        return false
    }
 
    /**
     * 判断相邻数据是否连续
     */
    open fun isContinuous(d1: BaseRealTimeData?, d2: BaseRealTimeData?): Boolean {
        if (d1 == null || d2 == null) return true
 
        val t1 = d1.dataTime
        val t2 = d2.dataTime
        return Duration.between(t1?.toInstant(), t2?.toInstant()).toMillis() <= (20 * 1000)
    }
 
    /**
     * 判断数据量级在异常判断的范围内
     * 默认所有量级都在异常判断的范围内
     */
    open fun judgeDataScale(p: BaseRealTimeData?, n: BaseRealTimeData): MutableMap<FactorType, Boolean> {
        val res = mutableMapOf<FactorType, Boolean>()
        config.factorFilter.mainList().forEach { f -> res[f] = true }
        return res
    }
 
    /**
     * 判断前后数据是否满足异常条件
     */
    abstract fun judgeException(p: BaseRealTimeData?, n: BaseRealTimeData): MutableMap<FactorType, Boolean>
 
    /**
     * 判断异常出现的连续个数是否满足条件
     * @param tag 异常数据对象
     */
    abstract fun judgeExceptionCount(tag: T, factorType: FactorType?): Boolean
 
    /**
     * 判断监测因子是否出现异常
     */
    open fun judge(p: BaseRealTimeData?, n: BaseRealTimeData): MutableMap<FactorType, Boolean> {
        val jds = judgeDataScale(p, n)
        val jex = judgeException(p, n)
        val res = mutableMapOf<FactorType, Boolean>()
        jds.forEach { (t, u) ->
            res[t] = u && jex[t] ?: false
        }
        return res
    }
 
    /**
     * 异常数据的截取判断
     * @return
     */
    open fun needCut(tag: T, hasException: Boolean?, data: BaseRealTimeData): Boolean {
        // 默认判断条件为 当异常不再重复出现时
        return tag.exceptionExisted && hasException == false
    }
 
    override fun init() {
        super.init()
        lastData = null
        tagMap.clear()
        config.factorFilter.mainList().forEach { f ->
            tagMap[f] = tagClz.newInstance()
        }
    }
 
    override fun onNextData(data: BaseRealTimeData) {
        val isContinue = isContinuous(lastData, data)
        val hasException = judge(lastData, data)
        config.factorFilter.selectedList.forEach { s ->
            val f = s.main
            // 排除此异常类型不适用的监测因子
            if (excludedFactor.contains(f)) return@forEach
 
            tagMap[f]?.let {
                it.eIndex++
                // 起始数据
                it.endData = data
                if (it.startData == null) {
                    it.refreshWithNextException(data)
                }
 
                // 按照不同的方式进行异常判断
                when (judgeMethod) {
                    JudgeMethod.M1 -> judgeMethod1(hasException, f, it, data, s)
                    JudgeMethod.M2 -> judgeMethod2(isContinue, hasException, f, it, data, s)
                }
 
                it.addHistoryData(data)
            }
        }
        lastData = data
 
        removeSingleFactor(data)
        val fittedComb = checkDelayedExceptions(data)
        mergeExceptionResult(data, fittedComb)
        onNewResult(result)
        clearExceptions(data)
    }
 
    override fun onDone() {
        checkResult(exceptionStatus = ExceptionStatusType.Ended)
    }
 
    /**
     * 数据异常判断方式一
     * 在一定的空间和时间范围内,数据累计出现N次异常后,认为该异常成立
     */
    private fun judgeMethod1(
        hasException: MutableMap<FactorType, Boolean>,
        f: FactorType,
        tag: T,
        data: BaseRealTimeData,
        s: FactorFilter.SelectedFactor,
    ) {
        // 出现异常
        if (hasException[f] == true) {
            // 判断数据在空间和时间变化上是否超出限定范围,若超出则删除遗留的异常记录,刷新起始点数据
            if (needCut(tag, hasException[f], data)) {
                tag.refreshWithNextException(data)
            }
            // 记录异常数据
            tag.addExceptionData(data)
            // 当立即判断通过时,形成异常结果
            if (immeExcCheck(tag, f)) {
                recordException(s, tag, data)
            }
        }
        // 数据正常,并且没有历史异常数据时,刷新起始点数据
        else if (!tag.exceptionExisted) {
            tag.refreshWithNextException(data)
        }
    }
 
    /**
     * 数据异常判断方式二
     * 要求数据不间断连续出现N次异常后,认为该异常成立
     */
    private fun judgeMethod2(
        isContinue: Boolean,
        hasException: MutableMap<FactorType, Boolean>,
        f: FactorType,
        tag: T,
        data: BaseRealTimeData,
        s: FactorFilter.SelectedFactor,
    ) {
        // 当相邻数据时间不连续时,刷新起始点数据,移除历史异常记录
        if (!isContinue) {
            tag.refreshWithNextException(data)
        }
        // 出现异常
        else if (hasException[f] == true) {
            // 有异常出现时,记录异常数据
            tag.addExceptionData(data)
            // 当立即判断通过时,形成异常结果
            if (immeExcCheck(tag, f)) {
                recordException(s, tag, data)
            }
        }
        // 数据正常,刷新起始点数据,移除历史异常记录
        else {
            tag.refreshWithNextException(data)
        }
    }
 
    /**
     * 异常结束,记录异常
     * 判断已有的异常数据是否满足异常条件,满足则记录,不满足则略过
     */
    fun recordException(factor: FactorFilter.SelectedFactor, tag: T, data: BaseRealTimeData) {
        checkResult(factor, ExceptionStatusType.Ended)
//        tag.refreshWithNextException(data)
    }
 
    /**
     * 检查连续异常结束时,是否符合异常存储条件
     */
    open fun checkResult(
        factor: FactorFilter.SelectedFactor? = null,
        exceptionStatus: ExceptionStatusType = ExceptionStatusType.InProgress,
    ) {
        val tag = tagMap[factor?.main]
        if (factor != null && tag != null) {
            if (tag.exceptionExisted && judgeExceptionCount(tag, factor.main)) {
                onNewException(tag, factor, exceptionStatus)
            }
        } else {
            config.factorFilter.selectedList.forEach { f ->
                val tag1 = tagMap[f.main] ?: return@forEach
                if (tag1.exceptionExisted && judgeExceptionCount(tag1, f.main)) {
                    onNewException(tag1, f, exceptionStatus)
                }
            }
        }
    }
 
    /**
     * 新增或更新一条异常
     */
    open fun onNewException(tag: T, factor: FactorFilter.SelectedFactor, exceptionStatus: ExceptionStatusType) {
        if (tag.startData == null) return
//        val ex = newResult(tag.startData!!, tag.endData, factor, tag.exceptionData)
//        val ex = newResult(tag, factor)
//            .apply { status = exceptionStatus.value }
//        // 异常已创建时,更新异常信息
//        if (tag.exceptionCreated) {
//            // 将最新的异常的guid赋值给ex
//            val lastEx = tag.exceptionResult.last()
//            ex.guid = lastEx.guid
//            tag.exceptionResult.removeLast()
//            tag.exceptionResult.add(ex)
//        }
//        // 异常未创建时,新建异常信息
//        else {
//            tag.exceptionResult.add(ex)
//            tag.exceptionCreated = true
//        }
//        val tagClone = tagClz.newInstance()
//        BeanUtils.copyProperties(tag, tagClone)
        latestExceptions.add(factor to tag)
    }
 
    /**
     * 将不在关联关系中的监测因子异常存储,并剔除
     */
    fun removeSingleFactor(data: BaseRealTimeData) {
        if (latestExceptions.isEmpty()) return
 
        // 查找不在因子关联组合中的异常因子
        val sfList = latestExceptions.filter {
            config.factorFilter.combination.find { c -> c.find { f -> f == it.first.main } != null } == null
        }
        // 生成对应的异常结果,并初始化该异常
        sfList.forEach {
            result.add(newResult(listOf(it)))
            it.second.refreshWithNextException(data)
        }
        // 剔除
        latestExceptions.removeAll(sfList)
    }
 
    /**
     * 检查延迟的待合并异常与当前异常是否能匹配
     * 1. 将遗留的超过等待数据周期的异常存储
     * 2. 将匹配成功的合并异常存储,同时将关联关系标记为已匹配
     * 3. 保留依旧未合并成功并且可继续等待的异常
     * @return 被匹配成功的关联关系
     */
    fun checkDelayedExceptions(data: BaseRealTimeData): List<List<FactorType>> {
        if (latestExceptions.isEmpty()) return emptyList()
 
        // 被匹配成功的监测因子关联关系
        val fittedComb = mutableListOf<List<FactorType>>()
        // 遗留的进入下一个数据周期做判断的待合并异常集合
        val leftExc = mutableListOf<RemainException<T>>()
        // 成功匹配的合并异常集合
        val combinedExc = mutableListOf<List<Pair<FactorFilter.SelectedFactor, T>>>()
        // 本次数据周期中,被匹配成功的异常集合
        val exceps = mutableListOf<Pair<FactorFilter.SelectedFactor, T>>()
        remainingExceptions.forEach {
            // 检查本次数据周期的异常中,是否包含因子关联关系中的异常
            val combRes = matchCombFactor(it.combination, latestExceptions)
            val res = combRes.second
            // 判断本次数据周期中找到的因子和已有的因子是否满足关联关系
            val findFactors = mutableListOf<FactorType>()
            res.forEach {r -> findFactors.add(r.first.main) }
            it.exceptions.forEach {r -> findFactors.add(r.first.main) }
            val isFitAll = findFactors.distinct() == it.combination
            // 如果已经没有缺失的异常因子,则可合并为组合异常
            if (isFitAll) {
                fittedComb.add(it.combination)
                // 将查找结果添加至已有异常集合中
                it.addExceptions(res)
//                // 记录被匹配成功的异常
//                res.forEach { r->
//                    if (exceps.find { e -> e.second == r.second } == null) {
//                        exceps.add(r)
//                    }
//                }
                // 将合并异常存储
                combinedExc.add(it.exceptions)
 
            }
            // 否则留作下次数据周期再判定存入待合并异常集合
            else {
                it.period++
                // 当待合并的异常等待数据周期大于设定值时,不再等待,直接输出异常
                if (it.period > config.maxDelayPeriod) {
                    result.add(newResult(it.exceptions))
                    return@forEach
                } else {
                    fittedComb.add(it.combination)
                    // 将查找结果添加至已有异常集合中
                    it.addExceptions(res)
//                    // 记录被匹配成功的异常
//                    res.forEach { r->
//                        if (exceps.find { e -> e.second == r.second } == null) {
//                            exceps.add(r)
//                        }
//                    }
                    leftExc.add(it)
                }
            }
        }
        // 存储合并异常
        combinedExc.forEach {
            result.add(newResult(it))
        }
//        // 将被匹配成功的异常刷新,并从本次数据周期的异常集合中移除
//        exceps.forEach { r-> r.second.refreshWithNextException(data) }
//        latestExceptions.removeAll(exceps)
        // 保留未匹配的组合
        remainingExceptions.clear()
        remainingExceptions.addAll(leftExc)
 
        return fittedComb
    }
 
    /**
     * 合并异常
     * @param data 当前监测数据
     * @param fittedComb 在遗留的异常[remainingExceptions]判断中,已经进行匹配判断的关联关系,将不再进行匹配
     */
    open fun mergeExceptionResult(data: BaseRealTimeData, fittedComb: List<List<FactorType>>) {
        if (latestExceptions.isEmpty()) return
 
        val combinedExc = mutableListOf<List<Pair<FactorFilter.SelectedFactor, T>>>()
        // 遍历所有的因子组合
        config.factorFilter.combination.forEach { c ->
            /**
             * 跳过已经在[checkDelayedExceptions]中判断过的关联关系
             */
            if (fittedComb.indexOf(c) >= 0) return@forEach
 
            val combRes = matchCombFactor(c, latestExceptions)
            val res = combRes.second
            val exist = combRes.first
            // 如果组合内的所有因子都存在异常,则存储为合并异常
            if (exist) {
                // 将合并异常存储
                combinedExc.add(res)
            }
            // 否则将异常的深拷贝版本存入待合并异常集合
            // TODO 2025.8.4: 后续添加当关联的监测因子累计异常计数接近阈值时,才存入集合的逻辑
            else if (res.isNotEmpty()) {
                remainingExceptions.add(RemainException(res, c))
            }
        }
 
        // 存储合并异常
        combinedExc.forEach {
            result.add(newResult(it))
        }
    }
 
    /**
     * 匹配关联异常因子
     * @param comb 关联因子关系
     * @param exceptions 各监测因子异常集合
     * @return exist表示是否找到关联关系[comb]中所有的因子,res表示找到的结果
     */
    private fun matchCombFactor(
        comb: List<FactorType>,
        exceptions: List<Pair<FactorFilter.SelectedFactor, T>>,
    ): Pair<Boolean, MutableList<Pair<FactorFilter.SelectedFactor, T>>> {
        val res = mutableListOf<Pair<FactorFilter.SelectedFactor, T>>()
        var exist = true
        // 查看组合内的所有因子是否都同时出现异常
        comb.forEach { f ->
            val r = exceptions.find { e ->
                e.first.main == f
            }
            if (r != null) {
                res.add(r)
            } else {
                exist = false
            }
        }
        return exist to res
    }
 
    abstract fun onNewResult(result: List<Y>)
 
    /**
     * 在异常生成结果后,进行初始化
     */
    private fun clearExceptions(data: BaseRealTimeData) {
        // 此时latestExceptions中应该包含的依旧是本次数据周期内的所有异常
        latestExceptions.forEach {
            it.second.refreshWithNextException(data)
        }
        latestExceptions.clear()
        result.clear()
    }
 
    /**
     * 生成一条异常分析结果
     */
    abstract fun newResult(tag: T, factor: FactorFilter.SelectedFactor): Y
 
    abstract fun newResult(exceptions: List<Pair<FactorFilter.SelectedFactor, ExceptionTag>>): Y
 
}