From 6fdacca914ef38e6cc91292ef07c5af32bd92991 Mon Sep 17 00:00:00 2001 From: hcong <1050828145@qq.com> Date: 星期二, 03 十二月 2024 11:12:40 +0800 Subject: [PATCH] 1. BgTask增加onStatusChange参数,通过外界传递如何发送状态改变的消息 2. BackgroundTaskCtrl.kt 创建了BgTask对象并传递了onStatusChange参数 3. WebSocketMessageUtil 类名修改为 WebSocketMessageDecoder 4. SendWebSocketMessageUtil 类名修改为 WebSocketSendMessageUtil --- src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt | 51 +++++++++++++++++++++++++++++++++------------------ 1 files changed, 33 insertions(+), 18 deletions(-) diff --git a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt index a7c6250..492e15e 100644 --- a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt +++ b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt @@ -1,8 +1,8 @@ package cn.flightfeather.supervision.common.executor -import cn.flightfeather.supervision.common.exception.ResponseErrorException +import cn.flightfeather.supervision.common.exception.BizException +import cn.flightfeather.supervision.socket.WebSocketSendMessageUtil import org.springframework.stereotype.Component -import java.time.LocalDateTime import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors import kotlin.jvm.Throws @@ -21,14 +21,27 @@ /** * 鏂板浠诲姟 */ - @Throws(ResponseErrorException::class) + @Throws(BizException::class) fun newTask(type: BgTaskType, id: String, name: String, task: () -> Boolean): BgTask { if (!taskCollection.containsKey(type)) { taskCollection[type] = ConcurrentHashMap<String, BgTask>() } val taskSet = taskCollection[type]!! - if (taskSet.containsKey(id)) throw ResponseErrorException("鏃犳硶鍒涘缓浠诲姟锛� 浠诲姟[${name}]鐨刬d閲嶅") - val t = BgTask(type, id, name, task) + // 鍒ゆ柇浠诲姟id鏄惁瀛樺湪 + if (taskSet.containsKey(id)) { + val oldTask = taskSet[id] + // 褰撲换鍔″凡缁忕粨鏉熸椂锛岀洿鎺ュ垹闄わ紝鏇挎崲涓烘柊浠诲姟 + if (oldTask?.taskStatus?.status != TaskStatus.WAITING || oldTask.taskStatus.status != TaskStatus.RUNNING) { + taskSet.remove(id) + } + else { + throw BizException("鏃犳硶閲嶅鍒涘缓浠诲姟") + } + } + val t = BgTask(type, id, name, task) { status -> + // 鍙戦�佹秷鎭� + WebSocketSendMessageUtil.sendBgTaskMessage(status) + } taskSet[id] = t return t } @@ -36,24 +49,26 @@ /** * 寮�濮嬩换鍔� */ - @Throws(ResponseErrorException::class) + @Throws(BizException::class) fun startTask(type: BgTaskType, id: String): BgTask { - val taskSet = taskCollection[type] ?: throw throw ResponseErrorException("鏃犳硶寮�鍚换鍔★紝璇ヤ换鍔$被鍨媅${type.des}]涓嶅瓨鍦�") - val t = taskSet[id] ?: throw ResponseErrorException("鏃犳硶寮�鍚换鍔★紝璇ヤ换鍔${id}]涓嶅瓨鍦�") + val taskSet = taskCollection[type] ?: throw throw BizException("鏃犳硶寮�鍚换鍔★紝璇ヤ换鍔$被鍨媅${type.des}]涓嶅瓨鍦�") + val t = taskSet[id] ?: throw BizException("鏃犳硶寮�鍚换鍔★紝璇ヤ换鍔${id}]涓嶅瓨鍦�") return startTask(t) } - @Throws(ResponseErrorException::class) + @Throws(BizException::class) fun startTask(task: BgTask): BgTask { if (task.taskStatus.status != TaskStatus.WAITING) { if (task.taskStatus.status == TaskStatus.RUNNING) { - throw ResponseErrorException("鏃犳硶寮�鍚换鍔★紝浠诲姟[${task.name}]姝e湪鎵ц") + throw BizException("鏃犳硶寮�鍚换鍔★紝浠诲姟[${task.name}]姝e湪鎵ц") } else { - throw ResponseErrorException("鏃犳硶寮�鍚换鍔★紝浠诲姟[${task.name}]宸茬粨鏉�") + throw BizException("鏃犳硶寮�鍚换鍔★紝浠诲姟[${task.name}]宸茬粨鏉�") } } else { task.ready() - task.future = executorService.submit { task.execute() } + task.future = executorService.submit { + task.execute() + } return task } } @@ -61,7 +76,7 @@ /** * 鏂板骞跺紑濮嬩换鍔� */ - @Throws(ResponseErrorException::class) + @Throws(BizException::class) fun startNewTask(type: BgTaskType, id: String, name: String, task: () -> Boolean): BgTask { val t = newTask(type, id, name, task) return startTask(t) @@ -96,11 +111,11 @@ /** * 寮哄埗鍏抽棴浠诲姟 */ - @Throws(ResponseErrorException::class) + @Throws(BizException::class) fun shutDownTask(type: BgTaskType, id: String?): List<BgTaskStatus?> { - val taskMap = taskCollection[type] ?: throw ResponseErrorException("鏃犳硶鍏抽棴浠诲姟锛屼换鍔$被鍨媅${type.des}]鏈垱寤�") + val taskMap = taskCollection[type] ?: throw BizException("鏃犳硶鍏抽棴浠诲姟锛屼换鍔$被鍨媅${type.des}]鏈垱寤�") return if (id != null) { - val task = taskMap[id] ?: throw ResponseErrorException("鏃犳硶鍏抽棴浠诲姟锛屼换鍔${id}]涓嶅瓨鍦�") + val task = taskMap[id] ?: throw BizException("鏃犳硶鍏抽棴浠诲姟锛屼换鍔${id}]涓嶅瓨鍦�") task.shutdown() listOf(task.taskStatus) } else { @@ -113,11 +128,11 @@ } } - @Throws(ResponseErrorException::class) + @Throws(BizException::class) fun removeTask(type: BgTaskType, id: String): Boolean { val statusList = shutDownTask(type, id) if (statusList.isNotEmpty()) { - val s = statusList.first() ?: throw ResponseErrorException("鏃犳硶绉婚櫎浠诲姟锛屼换鍔′笉瀛樺湪") + val s = statusList.first() ?: throw BizException("鏃犳硶绉婚櫎浠诲姟锛屼换鍔′笉瀛樺湪") taskCollection[s.type]?.remove(s.id) return true } -- Gitblit v1.9.3