From 6fdacca914ef38e6cc91292ef07c5af32bd92991 Mon Sep 17 00:00:00 2001
From: hcong <1050828145@qq.com>
Date: 星期二, 03 十二月 2024 11:12:40 +0800
Subject: [PATCH] 1. BgTask增加onStatusChange参数,通过外界传递如何发送状态改变的消息 2. BackgroundTaskCtrl.kt 创建了BgTask对象并传递了onStatusChange参数 3. WebSocketMessageUtil 类名修改为 WebSocketMessageDecoder 4. SendWebSocketMessageUtil 类名修改为 WebSocketSendMessageUtil
---
/dev/null | 20 ----------
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSendMessageUtil.java | 18 +++++++++
src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt | 4 +-
src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt | 12 ++---
src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTask.kt | 3 +
src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt | 4 +-
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageDecoder.java | 9 ++--
7 files changed, 34 insertions(+), 36 deletions(-)
diff --git a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt
index 730c3a6..492e15e 100644
--- a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt
+++ b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt
@@ -1,7 +1,7 @@
package cn.flightfeather.supervision.common.executor
import cn.flightfeather.supervision.common.exception.BizException
-import cn.flightfeather.supervision.common.utils.SendSocketMessageUtil
+import cn.flightfeather.supervision.socket.WebSocketSendMessageUtil
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
@@ -38,7 +38,10 @@
throw BizException("鏃犳硶閲嶅鍒涘缓浠诲姟")
}
}
- val t = BgTask(type, id, name, task)
+ val t = BgTask(type, id, name, task) { status ->
+ // 鍙戦�佹秷鎭�
+ WebSocketSendMessageUtil.sendBgTaskMessage(status)
+ }
taskSet[id] = t
return t
}
@@ -65,10 +68,7 @@
task.ready()
task.future = executorService.submit {
task.execute()
- println(task.taskStatus.runTime)
- SendSocketMessageUtil.sendBgTaskMessage(task.taskStatus)
}
- SendSocketMessageUtil.sendBgTaskMessage(task.taskStatus)
return task
}
}
@@ -117,13 +117,11 @@
return if (id != null) {
val task = taskMap[id] ?: throw BizException("鏃犳硶鍏抽棴浠诲姟锛屼换鍔${id}]涓嶅瓨鍦�")
task.shutdown()
- SendSocketMessageUtil.sendBgTaskMessage(task.taskStatus)
listOf(task.taskStatus)
} else {
val res = mutableListOf<BgTaskStatus?>()
taskMap.forEach { (t, u) ->
u.shutdown()
- SendSocketMessageUtil.sendBgTaskMessage(u.taskStatus)
res.add(u.taskStatus)
}
res.sortedBy { it?.createTime }
diff --git a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTask.kt b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTask.kt
index 66a9315..2ed156b 100644
--- a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTask.kt
+++ b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTask.kt
@@ -11,6 +11,7 @@
val id: String,
val name: String,
private val task: () -> Boolean,
+ private val onStatusChange: (status: BgTaskStatus) -> Unit
) {
var taskStatus = BgTaskStatus(type, id, name)
var future: Future<*>? = null
@@ -33,11 +34,13 @@
fun success() {
taskStatus.status = TaskStatus.SUCCESS
complete()
+ onStatusChange(taskStatus)
}
fun fail() {
taskStatus.status = TaskStatus.FAIL
complete()
+ onStatusChange(taskStatus)
}
fun shutdown() {
diff --git a/src/main/kotlin/cn/flightfeather/supervision/common/utils/SendSocketMessageUtil.java b/src/main/kotlin/cn/flightfeather/supervision/common/utils/SendSocketMessageUtil.java
deleted file mode 100644
index e4c9cfc..0000000
--- a/src/main/kotlin/cn/flightfeather/supervision/common/utils/SendSocketMessageUtil.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package cn.flightfeather.supervision.common.utils;
-
-import cn.flightfeather.supervision.common.executor.BgTaskStatus;
-import cn.flightfeather.supervision.socket.WebSocketMessage;
-import cn.flightfeather.supervision.socket.WebSocketMessageUtil;
-import cn.flightfeather.supervision.socket.WebSocketSenderHandler;
-
-public class SendSocketMessageUtil {
-
- /**
- * 鍙戦�佸悗鍙颁换鍔$殑socket娑堟伅
- * @param bgTaskStatus 娑堟伅鐨勫唴瀹�
- */
- public static void sendBgTaskMessage(BgTaskStatus bgTaskStatus) {
- WebSocketMessage webSocketMessage = new WebSocketMessage(Constant.SocketMessageType.BG_TASK.getValue(),
- bgTaskStatus);
- String message = WebSocketMessageUtil.encodeMessage(webSocketMessage);
- WebSocketSenderHandler.getInstance().broadcast(message);
- }
-}
diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageUtil.java b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageDecoder.java
similarity index 89%
rename from src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageUtil.java
rename to src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageDecoder.java
index 1a09218..7603ebc 100644
--- a/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageUtil.java
+++ b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageDecoder.java
@@ -3,9 +3,7 @@
import cn.flightfeather.supervision.common.utils.JsonUtil;
import org.springframework.util.StringUtils;
-import java.util.*;
-
-public class WebSocketMessageUtil {
+public class WebSocketMessageDecoder {
private static final String START_STR = "##";
private static final String SPLIT_STR = "&&";
private static final String END_STR = "%%";
@@ -34,7 +32,8 @@
*/
public static WebSocketMessage decodeMessage(String message) {
if (!verificationMessage(message)) {
- return null;
+ // 鍙戞尌涓�涓笉浼氳澶勭悊鐨勬秷鎭�
+ return new WebSocketMessage(-1, "");
}
WebSocketMessage webSocketMessage = new WebSocketMessage();
String[] parts = message.substring(START_STR.length(), message.length() - END_STR.length()).split(SPLIT_STR);
@@ -47,6 +46,6 @@
* @return 鐢熸垚鐨勬秷鎭瓧绗︿覆
*/
public static String encodeMessage(WebSocketMessage webSocketMessage) {
- return START_STR + webSocketMessage.getType() + SPLIT_STR + JsonUtil.INSTANCE.getGson().toJson(webSocketMessage.getContent()) + END_STR;
+ return START_STR + webSocketMessage.getType() + SPLIT_STR + JsonUtil.INSTANCE.getGson().toJson(webSocketMessage.getContent(), webSocketMessage.getContent().getClass()) + END_STR;
}
}
diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSendMessageUtil.java b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSendMessageUtil.java
new file mode 100644
index 0000000..5fa8920
--- /dev/null
+++ b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSendMessageUtil.java
@@ -0,0 +1,18 @@
+package cn.flightfeather.supervision.socket;
+
+import cn.flightfeather.supervision.common.executor.BgTaskStatus;
+import cn.flightfeather.supervision.common.utils.Constant;
+
+public class WebSocketSendMessageUtil {
+
+ /**
+ * 鍙戦�佸悗鍙颁换鍔$殑socket娑堟伅
+ * @param bgTaskStatus 娑堟伅鐨勫唴瀹�
+ */
+ public static void sendBgTaskMessage(BgTaskStatus bgTaskStatus) {
+ WebSocketMessage webSocketMessage = new WebSocketMessage(Constant.SocketMessageType.BG_TASK.getValue(),
+ bgTaskStatus);
+ String message = WebSocketMessageDecoder.encodeMessage(webSocketMessage);
+ WebSocketSenderHandler.getInstance().broadcast(message);
+ }
+}
diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt b/src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt
index 8c3df27..e66e71e 100644
--- a/src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt
+++ b/src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt
@@ -1,7 +1,7 @@
package cn.flightfeather.supervision.socket.config
import cn.flightfeather.supervision.common.utils.Constant
-import cn.flightfeather.supervision.socket.WebSocketMessageUtil
+import cn.flightfeather.supervision.socket.WebSocketMessageDecoder
import cn.flightfeather.supervision.socket.WsSessionManager
import cn.flightfeather.supervision.socket.processor.WebSocketReceiver
import org.springframework.stereotype.Component
@@ -47,7 +47,7 @@
val payload = message.payload
val sessionId = session.attributes["session_id"]
println("server 鎺ユ敹鍒� $sessionId 鍙戦�佺殑 $payload")
- if (WebSocketMessageUtil.decodeMessage(payload).type ==
+ if (WebSocketMessageDecoder.decodeMessage(payload).type ==
Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value) {
webSocketReceiver.onReceiveHeartMsg(payload, sessionId.toString())
}
diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt b/src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt
index 11b1c62..dcce685 100644
--- a/src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt
+++ b/src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt
@@ -4,7 +4,7 @@
import cn.flightfeather.supervision.common.log.WorkStreamLogInfo
import cn.flightfeather.supervision.common.utils.Constant
import cn.flightfeather.supervision.socket.WebSocketMessage
-import cn.flightfeather.supervision.socket.WebSocketMessageUtil
+import cn.flightfeather.supervision.socket.WebSocketMessageDecoder
import org.springframework.stereotype.Component
import java.time.LocalDateTime
@@ -30,7 +30,7 @@
* 鎺ユ敹蹇冭烦娑堟伅澶勭悊
*/
fun onReceiveHeartMsg(msg: String, sessionId: String) {
- val content = WebSocketMessageUtil.encodeMessage(WebSocketMessage(Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value,
+ val content = WebSocketMessageDecoder.encodeMessage(WebSocketMessage(Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value,
LocalDateTime.now()))
webSocketSender.sendMsg(content, sessionId)
}
--
Gitblit v1.9.3