From c95f66f4b81e81df9432c0c6d306ad22e3f5e587 Mon Sep 17 00:00:00 2001
From: hcong <1050828145@qq.com>
Date: 星期一, 02 十二月 2024 11:41:09 +0800
Subject: [PATCH] socket以及后台任务状态实时刷新
---
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSenderHandler.java | 19 ++++++
src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt | 15 ++++
src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt | 10 +++
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessage.java | 37 ++++++++++++
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageUtil.java | 52 +++++++++++++++++
src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt | 11 +++
src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt | 6 ++
src/main/kotlin/cn/flightfeather/supervision/common/utils/SendSocketMessageUtil.java | 20 ++++++
8 files changed, 168 insertions(+), 2 deletions(-)
diff --git a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt
index c8f23f6..730c3a6 100644
--- a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt
+++ b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt
@@ -1,6 +1,7 @@
package cn.flightfeather.supervision.common.executor
import cn.flightfeather.supervision.common.exception.BizException
+import cn.flightfeather.supervision.common.utils.SendSocketMessageUtil
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
@@ -62,7 +63,12 @@
}
} else {
task.ready()
- task.future = executorService.submit { task.execute() }
+ task.future = executorService.submit {
+ task.execute()
+ println(task.taskStatus.runTime)
+ SendSocketMessageUtil.sendBgTaskMessage(task.taskStatus)
+ }
+ SendSocketMessageUtil.sendBgTaskMessage(task.taskStatus)
return task
}
}
@@ -111,11 +117,13 @@
return if (id != null) {
val task = taskMap[id] ?: throw BizException("鏃犳硶鍏抽棴浠诲姟锛屼换鍔${id}]涓嶅瓨鍦�")
task.shutdown()
+ SendSocketMessageUtil.sendBgTaskMessage(task.taskStatus)
listOf(task.taskStatus)
} else {
val res = mutableListOf<BgTaskStatus?>()
taskMap.forEach { (t, u) ->
u.shutdown()
+ SendSocketMessageUtil.sendBgTaskMessage(u.taskStatus)
res.add(u.taskStatus)
}
res.sortedBy { it?.createTime }
diff --git a/src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt b/src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt
index d250d1b..deb1ae8 100644
--- a/src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt
+++ b/src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt
@@ -211,6 +211,17 @@
MULTI_MODE("multi_mode", "澶氶�夋ā寮�"),
}
+ // socket娑堟伅绫诲瀷
+ enum class SocketMessageType(val value: Int, val des: String){
+ BG_TASK(1, "鍚庡彴浠诲姟"),
+ BUSINESS_LOG(2, "涓氬姟鏃ュ織"),
+ }
+
+ // socket蹇冭烦娑堟伅绫诲瀷
+ enum class SocketHeartMessageType(val value: Int, val des: String){
+ HEART_MESSAGE_TYPE(0, "蹇冭烦鏈哄埗")
+ }
+
companion object {
//闂瀹℃牳
diff --git a/src/main/kotlin/cn/flightfeather/supervision/common/utils/SendSocketMessageUtil.java b/src/main/kotlin/cn/flightfeather/supervision/common/utils/SendSocketMessageUtil.java
new file mode 100644
index 0000000..e4c9cfc
--- /dev/null
+++ b/src/main/kotlin/cn/flightfeather/supervision/common/utils/SendSocketMessageUtil.java
@@ -0,0 +1,20 @@
+package cn.flightfeather.supervision.common.utils;
+
+import cn.flightfeather.supervision.common.executor.BgTaskStatus;
+import cn.flightfeather.supervision.socket.WebSocketMessage;
+import cn.flightfeather.supervision.socket.WebSocketMessageUtil;
+import cn.flightfeather.supervision.socket.WebSocketSenderHandler;
+
+public class SendSocketMessageUtil {
+
+ /**
+ * 鍙戦�佸悗鍙颁换鍔$殑socket娑堟伅
+ * @param bgTaskStatus 娑堟伅鐨勫唴瀹�
+ */
+ public static void sendBgTaskMessage(BgTaskStatus bgTaskStatus) {
+ WebSocketMessage webSocketMessage = new WebSocketMessage(Constant.SocketMessageType.BG_TASK.getValue(),
+ bgTaskStatus);
+ String message = WebSocketMessageUtil.encodeMessage(webSocketMessage);
+ WebSocketSenderHandler.getInstance().broadcast(message);
+ }
+}
diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessage.java b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessage.java
new file mode 100644
index 0000000..ea3c4f6
--- /dev/null
+++ b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessage.java
@@ -0,0 +1,37 @@
+package cn.flightfeather.supervision.socket;
+
+public class WebSocketMessage {
+ /*
+ * 娑堟伅绫诲瀷
+ * */
+ private int type;
+
+ /*
+ * 娑堟伅鍐呭
+ * */
+ private Object content;
+
+ public WebSocketMessage() {
+ }
+
+ public WebSocketMessage(int type, Object content) {
+ this.type = type;
+ this.content = content;
+ }
+
+ public int getType() {
+ return type;
+ }
+
+ public void setType(int type) {
+ this.type = type;
+ }
+
+ public Object getContent() {
+ return content;
+ }
+
+ public void setContent(Object content) {
+ this.content = content;
+ }
+}
diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageUtil.java b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageUtil.java
new file mode 100644
index 0000000..1a09218
--- /dev/null
+++ b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageUtil.java
@@ -0,0 +1,52 @@
+package cn.flightfeather.supervision.socket;
+
+import cn.flightfeather.supervision.common.utils.JsonUtil;
+import org.springframework.util.StringUtils;
+
+import java.util.*;
+
+public class WebSocketMessageUtil {
+ private static final String START_STR = "##";
+ private static final String SPLIT_STR = "&&";
+ private static final String END_STR = "%%";
+
+ /**
+ * 娑堟伅鏍煎紡鏍¢獙
+ * @param message 娑堟伅
+ * @return 鏍煎紡鏄惁鍚堣
+ */
+ private static boolean verificationMessage(String message) {
+ if (StringUtils.isEmpty(message)) {
+ return false;
+ }
+ if (!message.startsWith(START_STR)) {
+ return false;
+ }
+ if (!message.endsWith(END_STR)) {
+ return false;
+ }
+ return true;
+ }
+ /**
+ * 瑙f瀽鍑虹被鍨嬪拰鍐呭
+ * @param message socket娑堟伅涓殑data瀛楁
+ * @return 瑙f瀽缁撴灉锛屽鏋滄牸寮忎笉姝g‘鍒欒繑鍥瀗ull
+ */
+ public static WebSocketMessage decodeMessage(String message) {
+ if (!verificationMessage(message)) {
+ return null;
+ }
+ WebSocketMessage webSocketMessage = new WebSocketMessage();
+ String[] parts = message.substring(START_STR.length(), message.length() - END_STR.length()).split(SPLIT_STR);
+ webSocketMessage.setType(Integer.parseInt(parts[0]));
+ webSocketMessage.setContent(JsonUtil.INSTANCE.getGson().fromJson(parts[1], Object.class));
+ return webSocketMessage;
+ }
+ /**
+ * 鐢熸垚鎸囧畾鏍煎紡鐨勬秷鎭瓧绗︿覆
+ * @return 鐢熸垚鐨勬秷鎭瓧绗︿覆
+ */
+ public static String encodeMessage(WebSocketMessage webSocketMessage) {
+ return START_STR + webSocketMessage.getType() + SPLIT_STR + JsonUtil.INSTANCE.getGson().toJson(webSocketMessage.getContent()) + END_STR;
+ }
+}
diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSenderHandler.java b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSenderHandler.java
new file mode 100644
index 0000000..3ac4f5c
--- /dev/null
+++ b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSenderHandler.java
@@ -0,0 +1,19 @@
+package cn.flightfeather.supervision.socket;
+
+import cn.flightfeather.supervision.socket.processor.WebSocketSender;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+@Component
+public class WebSocketSenderHandler implements ApplicationContextAware {
+ static private ApplicationContext applicationContext;
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ WebSocketSenderHandler.applicationContext = applicationContext;
+ }
+ public static WebSocketSender getInstance() {
+ return applicationContext.getBean(WebSocketSender.class);
+ }
+}
diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt b/src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt
index 49bcb73..8c3df27 100644
--- a/src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt
+++ b/src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt
@@ -1,5 +1,7 @@
package cn.flightfeather.supervision.socket.config
+import cn.flightfeather.supervision.common.utils.Constant
+import cn.flightfeather.supervision.socket.WebSocketMessageUtil
import cn.flightfeather.supervision.socket.WsSessionManager
import cn.flightfeather.supervision.socket.processor.WebSocketReceiver
import org.springframework.stereotype.Component
@@ -45,6 +47,10 @@
val payload = message.payload
val sessionId = session.attributes["session_id"]
println("server 鎺ユ敹鍒� $sessionId 鍙戦�佺殑 $payload")
+ if (WebSocketMessageUtil.decodeMessage(payload).type ==
+ Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value) {
+ webSocketReceiver.onReceiveHeartMsg(payload, sessionId.toString())
+ }
webSocketReceiver.onReceiveMsg(payload)
// session.sendMessage(TextMessage("server 鍙戦�佺粰 " + sessionId + " 娑堟伅 " + payload + " " + LocalDateTime.now()
// .toString()))
diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt b/src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt
index d8c42f9..11b1c62 100644
--- a/src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt
+++ b/src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt
@@ -2,7 +2,11 @@
import cn.flightfeather.supervision.common.log.BizLog
import cn.flightfeather.supervision.common.log.WorkStreamLogInfo
+import cn.flightfeather.supervision.common.utils.Constant
+import cn.flightfeather.supervision.socket.WebSocketMessage
+import cn.flightfeather.supervision.socket.WebSocketMessageUtil
import org.springframework.stereotype.Component
+import java.time.LocalDateTime
/**
* webSocket娑堟伅鎺ユ敹绠$悊
@@ -10,7 +14,7 @@
* @author feiyu02
*/
@Component
-class WebSocketReceiver(private val bizLog: BizLog) {
+class WebSocketReceiver(private val bizLog: BizLog, private val webSocketSender: WebSocketSender) {
/**
* 鎺ユ敹娑堟伅澶勭悊
@@ -21,4 +25,13 @@
bizLog.info(WorkStreamLogInfo("8FAqSPnAA8ry4ExX", "鏈辨寮�", "鍦ㄤ笂娴峰箍鍙戠矇鐓ょ伆鏈夐檺鍏徃鏂板涓�涓棶棰�"))
}
}
+
+ /**
+ * 鎺ユ敹蹇冭烦娑堟伅澶勭悊
+ */
+ fun onReceiveHeartMsg(msg: String, sessionId: String) {
+ val content = WebSocketMessageUtil.encodeMessage(WebSocketMessage(Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value,
+ LocalDateTime.now()))
+ webSocketSender.sendMsg(content, sessionId)
+ }
}
\ No newline at end of file
--
Gitblit v1.9.3