package com.flightfeather.uav.biz.dataanalysis
|
|
import com.flightfeather.uav.biz.FactorFilter
|
import com.flightfeather.uav.biz.dataanalysis.model.ExceptionTag
|
import com.flightfeather.uav.biz.sourcetrace.model.RemainException
|
import com.flightfeather.uav.domain.entity.BaseRealTimeData
|
import com.flightfeather.uav.lightshare.eunm.ExceptionStatusType
|
import com.flightfeather.uav.socket.eunm.FactorType
|
import java.time.Duration
|
|
/**
|
* 连续类型的异常分析基类,适用于当前数据与相邻数据之间有关联关系的情况
|
*/
|
abstract class BaseExceptionContinuous<T : ExceptionTag, V : BaseAnalysisConfig, Y : BaseExceptionResult>(
|
config: V, private val tagClz: Class<T>,
|
) : BaseExceptionAnalysis<V, Y>(config) {
|
|
enum class JudgeMethod(val des: String) {
|
M1("在一定的空间和时间范围内,数据累计出现N次异常后,认为该异常成立"),
|
M2("要求数据不间断连续出现N次异常后,认为该异常成立"),
|
}
|
|
companion object {
|
// 记录异常数据段时,分别向起始前和末尾后额外记录的数据个数偏移量
|
private const val OFFSET = 10
|
}
|
|
protected val tagMap = mutableMapOf<FactorType, T>()
|
|
// 起始数据与末尾数据间隔
|
open var durationCount = 1
|
|
// 末尾数据对象
|
protected var lastData: BaseRealTimeData? = null
|
|
// 最新的一组异常,记录单因子异常
|
val latestExceptions = mutableListOf<Pair<FactorFilter.SelectedFactor, T>>()
|
|
/**
|
* 最新的一组合并异常,根据配置参数从[latestExceptions]单因子异常中,合并异常
|
*/
|
protected val latestCombinedExc = mutableListOf<List<Pair<FactorFilter.SelectedFactor, T>>>()
|
|
// 记录需要延迟数据周期进行合并的异常
|
val remainingExceptions = mutableListOf<RemainException<T>>()
|
|
/**
|
* 异常结果
|
*/
|
val result = mutableListOf<Y>()
|
|
/**
|
* 不适用于此异常类型的监测因子
|
*/
|
open var excludedFactor: List<FactorType> = emptyList()
|
|
abstract var judgeMethod: JudgeMethod
|
|
/**
|
* 立即判断:当出现异常时,缓存异常数据的同时,立即对已有异常进行判断是否满足异常结果要求
|
*/
|
open fun immeExcCheck(tag: T, factorType: FactorType): Boolean {
|
return false
|
}
|
|
/**
|
* 判断相邻数据是否连续
|
*/
|
open fun isContinuous(d1: BaseRealTimeData?, d2: BaseRealTimeData?): Boolean {
|
if (d1 == null || d2 == null) return true
|
|
val t1 = d1.dataTime
|
val t2 = d2.dataTime
|
return Duration.between(t1?.toInstant(), t2?.toInstant()).toMillis() <= (20 * 1000)
|
}
|
|
/**
|
* 判断数据量级在异常判断的范围内
|
* 默认所有量级都在异常判断的范围内
|
*/
|
open fun judgeDataScale(p: BaseRealTimeData?, n: BaseRealTimeData): MutableMap<FactorType, Boolean> {
|
val res = mutableMapOf<FactorType, Boolean>()
|
config.factorFilter.mainList().forEach { f -> res[f] = true }
|
return res
|
}
|
|
/**
|
* 判断前后数据是否满足异常条件
|
*/
|
abstract fun judgeException(p: BaseRealTimeData?, n: BaseRealTimeData): MutableMap<FactorType, Boolean>
|
|
/**
|
* 判断异常出现的连续个数是否满足条件
|
* @param tag 异常数据对象
|
*/
|
abstract fun judgeExceptionCount(tag: T, factorType: FactorType?): Boolean
|
|
/**
|
* 判断监测因子是否出现异常
|
*/
|
open fun judge(p: BaseRealTimeData?, n: BaseRealTimeData): MutableMap<FactorType, Boolean> {
|
val jds = judgeDataScale(p, n)
|
val jex = judgeException(p, n)
|
val res = mutableMapOf<FactorType, Boolean>()
|
jds.forEach { (t, u) ->
|
res[t] = u && jex[t] ?: false
|
}
|
return res
|
}
|
|
/**
|
* 异常数据的截取判断
|
* @return
|
*/
|
open fun needCut(tag: T, hasException: Boolean?, data: BaseRealTimeData): Boolean {
|
// 默认判断条件为 当异常不再重复出现时
|
return tag.exceptionExisted && hasException == false
|
}
|
|
override fun init() {
|
super.init()
|
lastData = null
|
tagMap.clear()
|
config.factorFilter.mainList().forEach { f ->
|
tagMap[f] = tagClz.newInstance()
|
}
|
}
|
|
override fun onNextData(data: BaseRealTimeData) {
|
val isContinue = isContinuous(lastData, data)
|
val hasException = judge(lastData, data)
|
config.factorFilter.selectedList.forEach { s ->
|
val f = s.main
|
// 排除此异常类型不适用的监测因子
|
if (excludedFactor.contains(f)) return@forEach
|
|
tagMap[f]?.let {
|
it.eIndex++
|
// 起始数据
|
it.endData = data
|
if (it.startData == null) {
|
it.refreshWithNextException(data)
|
}
|
|
// 按照不同的方式进行异常判断
|
when (judgeMethod) {
|
JudgeMethod.M1 -> judgeMethod1(hasException, f, it, data, s)
|
JudgeMethod.M2 -> judgeMethod2(isContinue, hasException, f, it, data, s)
|
}
|
|
it.addHistoryData(data)
|
}
|
}
|
lastData = data
|
|
removeSingleFactor(data)
|
val fittedComb = checkDelayedExceptions(data)
|
mergeExceptionResult(data, fittedComb)
|
onNewResult(result)
|
clearExceptions(data)
|
}
|
|
override fun onDone() {
|
checkResult(exceptionStatus = ExceptionStatusType.Ended)
|
}
|
|
/**
|
* 数据异常判断方式一
|
* 在一定的空间和时间范围内,数据累计出现N次异常后,认为该异常成立
|
*/
|
private fun judgeMethod1(
|
hasException: MutableMap<FactorType, Boolean>,
|
f: FactorType,
|
tag: T,
|
data: BaseRealTimeData,
|
s: FactorFilter.SelectedFactor,
|
) {
|
// 出现异常
|
if (hasException[f] == true) {
|
// 判断数据在空间和时间变化上是否超出限定范围,若超出则删除遗留的异常记录,刷新起始点数据
|
if (needCut(tag, hasException[f], data)) {
|
tag.refreshWithNextException(data)
|
}
|
// 记录异常数据
|
tag.addExceptionData(data)
|
// 当立即判断通过时,形成异常结果
|
if (immeExcCheck(tag, f)) {
|
recordException(s, tag, data)
|
}
|
}
|
// 数据正常,并且没有历史异常数据时,刷新起始点数据
|
else if (!tag.exceptionExisted) {
|
tag.refreshWithNextException(data)
|
}
|
}
|
|
/**
|
* 数据异常判断方式二
|
* 要求数据不间断连续出现N次异常后,认为该异常成立
|
*/
|
private fun judgeMethod2(
|
isContinue: Boolean,
|
hasException: MutableMap<FactorType, Boolean>,
|
f: FactorType,
|
tag: T,
|
data: BaseRealTimeData,
|
s: FactorFilter.SelectedFactor,
|
) {
|
// 当相邻数据时间不连续时,刷新起始点数据,移除历史异常记录
|
if (!isContinue) {
|
tag.refreshWithNextException(data)
|
}
|
// 出现异常
|
else if (hasException[f] == true) {
|
// 有异常出现时,记录异常数据
|
tag.addExceptionData(data)
|
// 当立即判断通过时,形成异常结果
|
if (immeExcCheck(tag, f)) {
|
recordException(s, tag, data)
|
}
|
}
|
// 数据正常,刷新起始点数据,移除历史异常记录
|
else {
|
tag.refreshWithNextException(data)
|
}
|
}
|
|
/**
|
* 异常结束,记录异常
|
* 判断已有的异常数据是否满足异常条件,满足则记录,不满足则略过
|
*/
|
fun recordException(factor: FactorFilter.SelectedFactor, tag: T, data: BaseRealTimeData) {
|
checkResult(factor, ExceptionStatusType.Ended)
|
// tag.refreshWithNextException(data)
|
}
|
|
/**
|
* 检查连续异常结束时,是否符合异常存储条件
|
*/
|
open fun checkResult(
|
factor: FactorFilter.SelectedFactor? = null,
|
exceptionStatus: ExceptionStatusType = ExceptionStatusType.InProgress,
|
) {
|
val tag = tagMap[factor?.main]
|
if (factor != null && tag != null) {
|
if (tag.exceptionExisted && judgeExceptionCount(tag, factor.main)) {
|
onNewException(tag, factor, exceptionStatus)
|
}
|
} else {
|
config.factorFilter.selectedList.forEach { f ->
|
val tag1 = tagMap[f.main] ?: return@forEach
|
if (tag1.exceptionExisted && judgeExceptionCount(tag1, f.main)) {
|
onNewException(tag1, f, exceptionStatus)
|
}
|
}
|
}
|
}
|
|
/**
|
* 新增或更新一条异常
|
*/
|
open fun onNewException(tag: T, factor: FactorFilter.SelectedFactor, exceptionStatus: ExceptionStatusType) {
|
if (tag.startData == null) return
|
// val ex = newResult(tag.startData!!, tag.endData, factor, tag.exceptionData)
|
// val ex = newResult(tag, factor)
|
// .apply { status = exceptionStatus.value }
|
// // 异常已创建时,更新异常信息
|
// if (tag.exceptionCreated) {
|
// // 将最新的异常的guid赋值给ex
|
// val lastEx = tag.exceptionResult.last()
|
// ex.guid = lastEx.guid
|
// tag.exceptionResult.removeLast()
|
// tag.exceptionResult.add(ex)
|
// }
|
// // 异常未创建时,新建异常信息
|
// else {
|
// tag.exceptionResult.add(ex)
|
// tag.exceptionCreated = true
|
// }
|
// val tagClone = tagClz.newInstance()
|
// BeanUtils.copyProperties(tag, tagClone)
|
latestExceptions.add(factor to tag)
|
}
|
|
/**
|
* 将不在关联关系中的监测因子异常存储,并剔除
|
*/
|
fun removeSingleFactor(data: BaseRealTimeData) {
|
if (latestExceptions.isEmpty()) return
|
|
// 查找不在因子关联组合中的异常因子
|
val sfList = latestExceptions.filter {
|
config.factorFilter.combination.find { c -> c.find { f -> f == it.first.main } != null } == null
|
}
|
// 生成对应的异常结果,并初始化该异常
|
sfList.forEach {
|
result.add(newResult(listOf(it)))
|
it.second.refreshWithNextException(data)
|
}
|
// 剔除
|
latestExceptions.removeAll(sfList)
|
}
|
|
/**
|
* 检查延迟的待合并异常与当前异常是否能匹配
|
* 1. 将遗留的超过等待数据周期的异常存储
|
* 2. 将匹配成功的合并异常存储,同时将关联关系标记为已匹配
|
* 3. 保留依旧未合并成功并且可继续等待的异常
|
* @return 被匹配成功的关联关系
|
*/
|
fun checkDelayedExceptions(data: BaseRealTimeData): List<List<FactorType>> {
|
if (latestExceptions.isEmpty()) return emptyList()
|
|
// 被匹配成功的监测因子关联关系
|
val fittedComb = mutableListOf<List<FactorType>>()
|
// 遗留的进入下一个数据周期做判断的待合并异常集合
|
val leftExc = mutableListOf<RemainException<T>>()
|
// 成功匹配的合并异常集合
|
val combinedExc = mutableListOf<List<Pair<FactorFilter.SelectedFactor, T>>>()
|
// 本次数据周期中,被匹配成功的异常集合
|
val exceps = mutableListOf<Pair<FactorFilter.SelectedFactor, T>>()
|
remainingExceptions.forEach {
|
// 检查本次数据周期的异常中,是否包含因子关联关系中的异常
|
val combRes = matchCombFactor(it.combination, latestExceptions)
|
val res = combRes.second
|
// 判断本次数据周期中找到的因子和已有的因子是否满足关联关系
|
val findFactors = mutableListOf<FactorType>()
|
res.forEach {r -> findFactors.add(r.first.main) }
|
it.exceptions.forEach {r -> findFactors.add(r.first.main) }
|
val isFitAll = findFactors.distinct() == it.combination
|
// 如果已经没有缺失的异常因子,则可合并为组合异常
|
if (isFitAll) {
|
fittedComb.add(it.combination)
|
// 将查找结果添加至已有异常集合中
|
it.addExceptions(res)
|
// // 记录被匹配成功的异常
|
// res.forEach { r->
|
// if (exceps.find { e -> e.second == r.second } == null) {
|
// exceps.add(r)
|
// }
|
// }
|
// 将合并异常存储
|
combinedExc.add(it.exceptions)
|
|
}
|
// 否则留作下次数据周期再判定存入待合并异常集合
|
else {
|
it.period++
|
// 当待合并的异常等待数据周期大于设定值时,不再等待,直接输出异常
|
if (it.period > config.maxDelayPeriod) {
|
result.add(newResult(it.exceptions))
|
return@forEach
|
} else {
|
fittedComb.add(it.combination)
|
// 将查找结果添加至已有异常集合中
|
it.addExceptions(res)
|
// // 记录被匹配成功的异常
|
// res.forEach { r->
|
// if (exceps.find { e -> e.second == r.second } == null) {
|
// exceps.add(r)
|
// }
|
// }
|
leftExc.add(it)
|
}
|
}
|
}
|
// 存储合并异常
|
combinedExc.forEach {
|
result.add(newResult(it))
|
}
|
// // 将被匹配成功的异常刷新,并从本次数据周期的异常集合中移除
|
// exceps.forEach { r-> r.second.refreshWithNextException(data) }
|
// latestExceptions.removeAll(exceps)
|
// 保留未匹配的组合
|
remainingExceptions.clear()
|
remainingExceptions.addAll(leftExc)
|
|
return fittedComb
|
}
|
|
/**
|
* 合并异常
|
* @param data 当前监测数据
|
* @param fittedComb 在遗留的异常[remainingExceptions]判断中,已经进行匹配判断的关联关系,将不再进行匹配
|
*/
|
open fun mergeExceptionResult(data: BaseRealTimeData, fittedComb: List<List<FactorType>>) {
|
if (latestExceptions.isEmpty()) return
|
|
val combinedExc = mutableListOf<List<Pair<FactorFilter.SelectedFactor, T>>>()
|
// 遍历所有的因子组合
|
config.factorFilter.combination.forEach { c ->
|
/**
|
* 跳过已经在[checkDelayedExceptions]中判断过的关联关系
|
*/
|
if (fittedComb.indexOf(c) >= 0) return@forEach
|
|
val combRes = matchCombFactor(c, latestExceptions)
|
val res = combRes.second
|
val exist = combRes.first
|
// 如果组合内的所有因子都存在异常,则存储为合并异常
|
if (exist) {
|
// 将合并异常存储
|
combinedExc.add(res)
|
}
|
// 否则将异常的深拷贝版本存入待合并异常集合
|
// TODO 2025.8.4: 后续添加当关联的监测因子累计异常计数接近阈值时,才存入集合的逻辑
|
else if (res.isNotEmpty()) {
|
remainingExceptions.add(RemainException(res, c))
|
}
|
}
|
|
// 存储合并异常
|
combinedExc.forEach {
|
result.add(newResult(it))
|
}
|
}
|
|
/**
|
* 匹配关联异常因子
|
* @param comb 关联因子关系
|
* @param exceptions 各监测因子异常集合
|
* @return exist表示是否找到关联关系[comb]中所有的因子,res表示找到的结果
|
*/
|
private fun matchCombFactor(
|
comb: List<FactorType>,
|
exceptions: List<Pair<FactorFilter.SelectedFactor, T>>,
|
): Pair<Boolean, MutableList<Pair<FactorFilter.SelectedFactor, T>>> {
|
val res = mutableListOf<Pair<FactorFilter.SelectedFactor, T>>()
|
var exist = true
|
// 查看组合内的所有因子是否都同时出现异常
|
comb.forEach { f ->
|
val r = exceptions.find { e ->
|
e.first.main == f
|
}
|
if (r != null) {
|
res.add(r)
|
} else {
|
exist = false
|
}
|
}
|
return exist to res
|
}
|
|
abstract fun onNewResult(result: List<Y>)
|
|
/**
|
* 在异常生成结果后,进行初始化
|
*/
|
private fun clearExceptions(data: BaseRealTimeData) {
|
// 此时latestExceptions中应该包含的依旧是本次数据周期内的所有异常
|
latestExceptions.forEach {
|
it.second.refreshWithNextException(data)
|
}
|
latestExceptions.clear()
|
result.clear()
|
}
|
|
/**
|
* 生成一条异常分析结果
|
*/
|
abstract fun newResult(tag: T, factor: FactorFilter.SelectedFactor): Y
|
|
abstract fun newResult(exceptions: List<Pair<FactorFilter.SelectedFactor, ExceptionTag>>): Y
|
|
}
|