| | |
| | | |
| | | import com.flightfeather.uav.biz.FactorFilter |
| | | import com.flightfeather.uav.biz.dataanalysis.model.ExceptionTag |
| | | import com.flightfeather.uav.biz.sourcetrace.model.RemainException |
| | | import com.flightfeather.uav.domain.entity.BaseRealTimeData |
| | | import com.flightfeather.uav.lightshare.eunm.ExceptionStatusType |
| | | import com.flightfeather.uav.socket.eunm.FactorType |
| | |
| | | * 连续类型的异常分析基类,适用于当前数据与相邻数据之间有关联关系的情况 |
| | | */ |
| | | abstract class BaseExceptionContinuous<T : ExceptionTag, V : BaseAnalysisConfig, Y : BaseExceptionResult>( |
| | | config: V, private val tagClz: Class<T> |
| | | config: V, private val tagClz: Class<T>, |
| | | ) : BaseExceptionAnalysis<V, Y>(config) { |
| | | |
| | | enum class JudgeMethod(val des: String) { |
| | | M1("在一定的空间和时间范围内,数据累计出现N次异常后,认为该异常成立"), |
| | | M2("要求数据不间断连续出现N次异常后,认为该异常成立"), |
| | | } |
| | | |
| | | companion object { |
| | | // 记录异常数据段时,分别向起始前和末尾后额外记录的数据个数偏移量 |
| | |
| | | // 末尾数据对象 |
| | | protected var lastData: BaseRealTimeData? = null |
| | | |
| | | // 最新的一组异常,记录单因子异常 |
| | | val latestExceptions = mutableListOf<Pair<FactorFilter.SelectedFactor, T>>() |
| | | |
| | | /** |
| | | * 后置判断:当相邻数据时间不连续时,或者满足自定义条件时,对之前已有的异常进行记录 |
| | | * 最新的一组合并异常,根据配置参数从[latestExceptions]单因子异常中,合并异常 |
| | | */ |
| | | open fun afterExcCheck(isContinue: Boolean, tag: T, hasException: Boolean?): Boolean { |
| | | return !isContinue || needCut(tag, hasException) |
| | | } |
| | | protected val latestCombinedExc = mutableListOf<List<Pair<FactorFilter.SelectedFactor, T>>>() |
| | | |
| | | // 记录需要延迟数据周期进行合并的异常 |
| | | val remainingExceptions = mutableListOf<RemainException<T>>() |
| | | |
| | | /** |
| | | * 异常结果 |
| | | */ |
| | | val result = mutableListOf<Y>() |
| | | |
| | | /** |
| | | * 不适用于此异常类型的监测因子 |
| | | */ |
| | | open var excludedFactor: List<FactorType> = emptyList() |
| | | |
| | | abstract var judgeMethod: JudgeMethod |
| | | |
| | | /** |
| | | * 立即判断:当出现异常时,缓存异常数据的同时,立即对已有异常进行判断是否满足异常结果要求 |
| | |
| | | } |
| | | |
| | | /** |
| | | * 判断数据量级在异常判断的范围内 |
| | | * 默认所有量级都在异常判断的范围内 |
| | | */ |
| | | open fun judgeDataScale(p: BaseRealTimeData?, n: BaseRealTimeData): MutableMap<FactorType, Boolean> { |
| | | val res = mutableMapOf<FactorType, Boolean>() |
| | | config.factorFilter.mainList().forEach { f -> res[f] = true } |
| | | return res |
| | | } |
| | | |
| | | /** |
| | | * 判断前后数据是否满足异常条件 |
| | | */ |
| | | abstract fun judgeException(p: BaseRealTimeData?, n: BaseRealTimeData): MutableMap<FactorType, Boolean> |
| | |
| | | abstract fun judgeExceptionCount(tag: T, factorType: FactorType?): Boolean |
| | | |
| | | /** |
| | | * 判断监测因子是否出现异常 |
| | | */ |
| | | open fun judge(p: BaseRealTimeData?, n: BaseRealTimeData): MutableMap<FactorType, Boolean> { |
| | | val jds = judgeDataScale(p, n) |
| | | val jex = judgeException(p, n) |
| | | val res = mutableMapOf<FactorType, Boolean>() |
| | | jds.forEach { (t, u) -> |
| | | res[t] = u && jex[t] ?: false |
| | | } |
| | | return res |
| | | } |
| | | |
| | | /** |
| | | * 异常数据的截取判断 |
| | | * @return |
| | | */ |
| | | open fun needCut(tag: T, hasException: Boolean?): Boolean { |
| | | // 默认判断条件为 当异常不再重复出现时,形成异常结果 |
| | | open fun needCut(tag: T, hasException: Boolean?, data: BaseRealTimeData): Boolean { |
| | | // 默认判断条件为 当异常不再重复出现时 |
| | | return tag.exceptionExisted && hasException == false |
| | | } |
| | | |
| | |
| | | |
| | | override fun onNextData(data: BaseRealTimeData) { |
| | | val isContinue = isContinuous(lastData, data) |
| | | val hasException = judgeException(lastData, data) |
| | | val hasException = judge(lastData, data) |
| | | config.factorFilter.selectedList.forEach { s -> |
| | | val f = s.main |
| | | tagMap[f]?.let { |
| | | it.addHistoryData(data) |
| | | // 排除此异常类型不适用的监测因子 |
| | | if (excludedFactor.contains(f)) return@forEach |
| | | |
| | | tagMap[f]?.let { |
| | | it.eIndex++ |
| | | // 起始数据 |
| | | it.endData = data |
| | |
| | | it.refreshWithNextException(data) |
| | | } |
| | | |
| | | // 对于异常的生成分别执行后置判断、和立即判断 |
| | | // 1. 后置判断:当相邻数据时间不连续时,或者满足自定义条件时,对之前已有的异常进行记录,形成异常结果 |
| | | if (afterExcCheck(isContinue, it, hasException[f])) { |
| | | // 数据不连续时或者满足主动截断条件时,记录异常情况 |
| | | recordException(s, it, data) |
| | | // 按照不同的方式进行异常判断 |
| | | when (judgeMethod) { |
| | | JudgeMethod.M1 -> judgeMethod1(hasException, f, it, data, s) |
| | | JudgeMethod.M2 -> judgeMethod2(isContinue, hasException, f, it, data, s) |
| | | } |
| | | // 2. 立即判断:当出现异常时,缓存异常数据的同时,立即对已有异常进行判断是否满足异常结果要求 |
| | | else if (hasException[f] == true) { |
| | | // 有异常出现时,记录异常数据 |
| | | it.addExceptionData(data) |
| | | // 当立即判断通过时,形成异常结果 |
| | | if (immeExcCheck(it, f)) { |
| | | recordException(s, it, data) |
| | | } |
| | | } |
| | | // 3. 数据正常,无任何异常时d |
| | | // TODO("2025.6.3:其他子类的此处刷新逻辑待完成“) |
| | | else { |
| | | it.refreshWithNoException(data) |
| | | } |
| | | |
| | | it.addHistoryData(data) |
| | | } |
| | | } |
| | | lastData = data |
| | | |
| | | removeSingleFactor(data) |
| | | val fittedComb = checkDelayedExceptions(data) |
| | | mergeExceptionResult(data, fittedComb) |
| | | onNewResult(result) |
| | | clearExceptions(data) |
| | | } |
| | | |
| | | override fun onDone() { |
| | | checkResult(exceptionStatus = ExceptionStatusType.Ended) |
| | | } |
| | | |
| | | /** |
| | | * 数据异常判断方式一 |
| | | * 在一定的空间和时间范围内,数据累计出现N次异常后,认为该异常成立 |
| | | */ |
| | | private fun judgeMethod1( |
| | | hasException: MutableMap<FactorType, Boolean>, |
| | | f: FactorType, |
| | | tag: T, |
| | | data: BaseRealTimeData, |
| | | s: FactorFilter.SelectedFactor, |
| | | ) { |
| | | // 出现异常 |
| | | if (hasException[f] == true) { |
| | | // 判断数据在空间和时间变化上是否超出限定范围,若超出则删除遗留的异常记录,刷新起始点数据 |
| | | if (needCut(tag, hasException[f], data)) { |
| | | tag.refreshWithNextException(data) |
| | | } |
| | | // 记录异常数据 |
| | | tag.addExceptionData(data) |
| | | // 当立即判断通过时,形成异常结果 |
| | | if (immeExcCheck(tag, f)) { |
| | | recordException(s, tag, data) |
| | | } |
| | | } |
| | | // 数据正常,并且没有历史异常数据时,刷新起始点数据 |
| | | else if (!tag.exceptionExisted) { |
| | | tag.refreshWithNextException(data) |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 数据异常判断方式二 |
| | | * 要求数据不间断连续出现N次异常后,认为该异常成立 |
| | | */ |
| | | private fun judgeMethod2( |
| | | isContinue: Boolean, |
| | | hasException: MutableMap<FactorType, Boolean>, |
| | | f: FactorType, |
| | | tag: T, |
| | | data: BaseRealTimeData, |
| | | s: FactorFilter.SelectedFactor, |
| | | ) { |
| | | // 当相邻数据时间不连续时,刷新起始点数据,移除历史异常记录 |
| | | if (!isContinue) { |
| | | tag.refreshWithNextException(data) |
| | | } |
| | | // 出现异常 |
| | | else if (hasException[f] == true) { |
| | | // 有异常出现时,记录异常数据 |
| | | tag.addExceptionData(data) |
| | | // 当立即判断通过时,形成异常结果 |
| | | if (immeExcCheck(tag, f)) { |
| | | recordException(s, tag, data) |
| | | } |
| | | } |
| | | // 数据正常,刷新起始点数据,移除历史异常记录 |
| | | else { |
| | | tag.refreshWithNextException(data) |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | fun recordException(factor: FactorFilter.SelectedFactor, tag: T, data: BaseRealTimeData) { |
| | | checkResult(factor, ExceptionStatusType.Ended) |
| | | tag.refreshWithNextException(data) |
| | | // tag.refreshWithNextException(data) |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | open fun checkResult( |
| | | factor: FactorFilter.SelectedFactor? = null, |
| | | exceptionStatus: ExceptionStatusType = ExceptionStatusType.InProgress |
| | | exceptionStatus: ExceptionStatusType = ExceptionStatusType.InProgress, |
| | | ) { |
| | | val tag = tagMap[factor?.main] |
| | | if (factor != null && tag != null) { |
| | |
| | | } else { |
| | | config.factorFilter.selectedList.forEach { f -> |
| | | val tag1 = tagMap[f.main] ?: return@forEach |
| | | if (tag1.exceptionExisted && judgeExceptionCount(tag1, null)) { |
| | | if (tag1.exceptionExisted && judgeExceptionCount(tag1, f.main)) { |
| | | onNewException(tag1, f, exceptionStatus) |
| | | } |
| | | } |
| | |
| | | open fun onNewException(tag: T, factor: FactorFilter.SelectedFactor, exceptionStatus: ExceptionStatusType) { |
| | | if (tag.startData == null) return |
| | | // val ex = newResult(tag.startData!!, tag.endData, factor, tag.exceptionData) |
| | | val ex = newResult(tag, factor) |
| | | .apply { status = exceptionStatus.value } |
| | | // 异常已创建时,更新异常信息 |
| | | if (tag.exceptionCreated) { |
| | | // 将最新的异常的guid赋值给ex |
| | | val lastEx = tag.exceptionResult.last() |
| | | ex.guid = lastEx.guid |
| | | tag.exceptionResult.removeLast() |
| | | tag.exceptionResult.add(ex) |
| | | // val ex = newResult(tag, factor) |
| | | // .apply { status = exceptionStatus.value } |
| | | // // 异常已创建时,更新异常信息 |
| | | // if (tag.exceptionCreated) { |
| | | // // 将最新的异常的guid赋值给ex |
| | | // val lastEx = tag.exceptionResult.last() |
| | | // ex.guid = lastEx.guid |
| | | // tag.exceptionResult.removeLast() |
| | | // tag.exceptionResult.add(ex) |
| | | // } |
| | | // // 异常未创建时,新建异常信息 |
| | | // else { |
| | | // tag.exceptionResult.add(ex) |
| | | // tag.exceptionCreated = true |
| | | // } |
| | | // val tagClone = tagClz.newInstance() |
| | | // BeanUtils.copyProperties(tag, tagClone) |
| | | latestExceptions.add(factor to tag) |
| | | } |
| | | |
| | | /** |
| | | * 将不在关联关系中的监测因子异常存储,并剔除 |
| | | */ |
| | | fun removeSingleFactor(data: BaseRealTimeData) { |
| | | if (latestExceptions.isEmpty()) return |
| | | |
| | | // 查找不在因子关联组合中的异常因子 |
| | | val sfList = latestExceptions.filter { |
| | | config.factorFilter.combination.find { c -> c.find { f -> f == it.first.main } != null } == null |
| | | } |
| | | // 异常未创建时,新建异常信息 |
| | | else { |
| | | tag.exceptionResult.add(ex) |
| | | // resultList.add(ex) |
| | | tag.exceptionCreated = true |
| | | // 生成对应的异常结果,并初始化该异常 |
| | | sfList.forEach { |
| | | result.add(newResult(listOf(it))) |
| | | it.second.refreshWithNextException(data) |
| | | } |
| | | // 剔除 |
| | | latestExceptions.removeAll(sfList) |
| | | } |
| | | |
| | | /** |
| | | * 检查延迟的待合并异常与当前异常是否能匹配 |
| | | * 1. 将遗留的超过等待数据周期的异常存储 |
| | | * 2. 将匹配成功的合并异常存储,同时将关联关系标记为已匹配 |
| | | * 3. 保留依旧未合并成功并且可继续等待的异常 |
| | | * @return 被匹配成功的关联关系 |
| | | */ |
| | | fun checkDelayedExceptions(data: BaseRealTimeData): List<List<FactorType>> { |
| | | if (latestExceptions.isEmpty()) return emptyList() |
| | | |
| | | // 被匹配成功的监测因子关联关系 |
| | | val fittedComb = mutableListOf<List<FactorType>>() |
| | | // 遗留的进入下一个数据周期做判断的待合并异常集合 |
| | | val leftExc = mutableListOf<RemainException<T>>() |
| | | // 成功匹配的合并异常集合 |
| | | val combinedExc = mutableListOf<List<Pair<FactorFilter.SelectedFactor, T>>>() |
| | | // 本次数据周期中,被匹配成功的异常集合 |
| | | val exceps = mutableListOf<Pair<FactorFilter.SelectedFactor, T>>() |
| | | remainingExceptions.forEach { |
| | | // 检查本次数据周期的异常中,是否包含因子关联关系中的异常 |
| | | val combRes = matchCombFactor(it.combination, latestExceptions) |
| | | val res = combRes.second |
| | | // 判断本次数据周期中找到的因子和已有的因子是否满足关联关系 |
| | | val findFactors = mutableListOf<FactorType>() |
| | | res.forEach {r -> findFactors.add(r.first.main) } |
| | | it.exceptions.forEach {r -> findFactors.add(r.first.main) } |
| | | val isFitAll = findFactors.distinct() == it.combination |
| | | // 如果已经没有缺失的异常因子,则可合并为组合异常 |
| | | if (isFitAll) { |
| | | fittedComb.add(it.combination) |
| | | // 将查找结果添加至已有异常集合中 |
| | | it.addExceptions(res) |
| | | // // 记录被匹配成功的异常 |
| | | // res.forEach { r-> |
| | | // if (exceps.find { e -> e.second == r.second } == null) { |
| | | // exceps.add(r) |
| | | // } |
| | | // } |
| | | // 将合并异常存储 |
| | | combinedExc.add(it.exceptions) |
| | | |
| | | } |
| | | // 否则留作下次数据周期再判定存入待合并异常集合 |
| | | else { |
| | | it.period++ |
| | | // 当待合并的异常等待数据周期大于设定值时,不再等待,直接输出异常 |
| | | if (it.period > config.maxDelayPeriod) { |
| | | result.add(newResult(it.exceptions)) |
| | | return@forEach |
| | | } else { |
| | | fittedComb.add(it.combination) |
| | | // 将查找结果添加至已有异常集合中 |
| | | it.addExceptions(res) |
| | | // // 记录被匹配成功的异常 |
| | | // res.forEach { r-> |
| | | // if (exceps.find { e -> e.second == r.second } == null) { |
| | | // exceps.add(r) |
| | | // } |
| | | // } |
| | | leftExc.add(it) |
| | | } |
| | | } |
| | | } |
| | | // 存储合并异常 |
| | | combinedExc.forEach { |
| | | result.add(newResult(it)) |
| | | } |
| | | // // 将被匹配成功的异常刷新,并从本次数据周期的异常集合中移除 |
| | | // exceps.forEach { r-> r.second.refreshWithNextException(data) } |
| | | // latestExceptions.removeAll(exceps) |
| | | // 保留未匹配的组合 |
| | | remainingExceptions.clear() |
| | | remainingExceptions.addAll(leftExc) |
| | | |
| | | return fittedComb |
| | | } |
| | | |
| | | /** |
| | | * 合并异常 |
| | | * @param data 当前监测数据 |
| | | * @param fittedComb 在遗留的异常[remainingExceptions]判断中,已经进行匹配判断的关联关系,将不再进行匹配 |
| | | */ |
| | | open fun mergeExceptionResult(data: BaseRealTimeData, fittedComb: List<List<FactorType>>) { |
| | | if (latestExceptions.isEmpty()) return |
| | | |
| | | val combinedExc = mutableListOf<List<Pair<FactorFilter.SelectedFactor, T>>>() |
| | | // 遍历所有的因子组合 |
| | | config.factorFilter.combination.forEach { c -> |
| | | /** |
| | | * 跳过已经在[checkDelayedExceptions]中判断过的关联关系 |
| | | */ |
| | | if (fittedComb.indexOf(c) >= 0) return@forEach |
| | | |
| | | val combRes = matchCombFactor(c, latestExceptions) |
| | | val res = combRes.second |
| | | val exist = combRes.first |
| | | // 如果组合内的所有因子都存在异常,则存储为合并异常 |
| | | if (exist) { |
| | | // 将合并异常存储 |
| | | combinedExc.add(res) |
| | | } |
| | | // 否则将异常的深拷贝版本存入待合并异常集合 |
| | | // TODO 2025.8.4: 后续添加当关联的监测因子累计异常计数接近阈值时,才存入集合的逻辑 |
| | | else if (res.isNotEmpty()) { |
| | | remainingExceptions.add(RemainException(res, c)) |
| | | } |
| | | } |
| | | |
| | | // 存储合并异常 |
| | | combinedExc.forEach { |
| | | result.add(newResult(it)) |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 匹配关联异常因子 |
| | | * @param comb 关联因子关系 |
| | | * @param exceptions 各监测因子异常集合 |
| | | * @return exist表示是否找到关联关系[comb]中所有的因子,res表示找到的结果 |
| | | */ |
| | | private fun matchCombFactor( |
| | | comb: List<FactorType>, |
| | | exceptions: List<Pair<FactorFilter.SelectedFactor, T>>, |
| | | ): Pair<Boolean, MutableList<Pair<FactorFilter.SelectedFactor, T>>> { |
| | | val res = mutableListOf<Pair<FactorFilter.SelectedFactor, T>>() |
| | | var exist = true |
| | | // 查看组合内的所有因子是否都同时出现异常 |
| | | comb.forEach { f -> |
| | | val r = exceptions.find { e -> |
| | | e.first.main == f |
| | | } |
| | | if (r != null) { |
| | | res.add(r) |
| | | } else { |
| | | exist = false |
| | | } |
| | | } |
| | | return exist to res |
| | | } |
| | | |
| | | abstract fun onNewResult(result: List<Y>) |
| | | |
| | | /** |
| | | * 在异常生成结果后,进行初始化 |
| | | */ |
| | | private fun clearExceptions(data: BaseRealTimeData) { |
| | | // 此时latestExceptions中应该包含的依旧是本次数据周期内的所有异常 |
| | | latestExceptions.forEach { |
| | | it.second.refreshWithNextException(data) |
| | | } |
| | | latestExceptions.clear() |
| | | result.clear() |
| | | } |
| | | |
| | | /** |
| | | * 生成一条异常分析结果 |
| | | */ |
| | | abstract fun newResult(tag:T, factor: FactorFilter.SelectedFactor): Y |
| | | abstract fun newResult(tag: T, factor: FactorFilter.SelectedFactor): Y |
| | | |
| | | abstract fun newResult(exceptions: List<Pair<FactorFilter.SelectedFactor, ExceptionTag>>): Y |
| | | |
| | | } |