From 0c59552dc14c9023d4c0a9d57509cce1d5a6d6da Mon Sep 17 00:00:00 2001
From: feiyu02 <risaku@163.com>
Date: 星期二, 31 十二月 2024 10:07:45 +0800
Subject: [PATCH] Merge remote-tracking branch 'supervision/hc-socket-1129'

---
 src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSendMessageUtil.java            |   18 +++
 src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSenderHandler.java              |   19 +++
 src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatus.kt                 |    5 +
 src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatusJsonSerializer.java |   51 ++++++++++
 src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt           |   15 ++
 src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageParser.java              |   51 ++++++++++
 src/main/kotlin/cn/flightfeather/supervision/socket/LocalDateTimeAdapter.java                |   30 ++++++
 src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTask.kt                       |   14 ++
 src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt         |    6 +
 src/main/kotlin/cn/flightfeather/supervision/common/utils/JsonUtil.kt                        |    6 +
 src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt           |   10 +
 src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessage.java                    |   37 +++++++
 src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt                        |   11 ++
 13 files changed, 265 insertions(+), 8 deletions(-)

diff --git a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt
index c8f23f6..492e15e 100644
--- a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt
+++ b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt
@@ -1,6 +1,7 @@
 package cn.flightfeather.supervision.common.executor
 
 import cn.flightfeather.supervision.common.exception.BizException
+import cn.flightfeather.supervision.socket.WebSocketSendMessageUtil
 import org.springframework.stereotype.Component
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.Executors
@@ -37,7 +38,10 @@
                 throw BizException("鏃犳硶閲嶅鍒涘缓浠诲姟")
             }
         }
-        val t = BgTask(type, id, name, task)
+        val t = BgTask(type, id, name, task) { status ->
+            // 鍙戦�佹秷鎭�
+            WebSocketSendMessageUtil.sendBgTaskMessage(status)
+        }
         taskSet[id] = t
         return t
     }
@@ -62,7 +66,9 @@
             }
         } else {
             task.ready()
-            task.future = executorService.submit { task.execute() }
+            task.future = executorService.submit {
+                task.execute()
+            }
             return task
         }
     }
diff --git a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTask.kt b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTask.kt
index 66a9315..3a0b2b1 100644
--- a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTask.kt
+++ b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTask.kt
@@ -11,13 +11,14 @@
     val id: String,
     val name: String,
     private val task: () -> Boolean,
+    private val onStatusChange: (status: BgTaskStatus) -> Unit
 ) {
     var taskStatus = BgTaskStatus(type, id, name)
     var future: Future<*>? = null
 
     fun ready() {
-        taskStatus.status = TaskStatus.RUNNING
         taskStatus.startTime = LocalDateTime.now()
+        setStatus(TaskStatus.RUNNING)
     }
 
     fun execute() {
@@ -31,24 +32,29 @@
     }
 
     fun success() {
-        taskStatus.status = TaskStatus.SUCCESS
         complete()
+        setStatus(TaskStatus.SUCCESS)
     }
 
     fun fail() {
-        taskStatus.status = TaskStatus.FAIL
         complete()
+        setStatus(TaskStatus.FAIL)
     }
 
     fun shutdown() {
         if (future?.isCancelled == false && !future!!.isDone) {
             future!!.cancel(true)
         }
-        taskStatus.status = TaskStatus.SHUTDOWN
         complete()
+        setStatus(TaskStatus.SHUTDOWN)
     }
 
     fun complete() {
         taskStatus.endTime = LocalDateTime.now()
     }
+
+    fun setStatus(status: TaskStatus) {
+        taskStatus.status = status
+        onStatusChange(taskStatus)
+    }
 }
\ No newline at end of file
diff --git a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatus.kt b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatus.kt
index 60bfcf4..93d036a 100644
--- a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatus.kt
+++ b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatus.kt
@@ -1,5 +1,7 @@
 package cn.flightfeather.supervision.common.executor
 
+import cn.flightfeather.supervision.socket.LocalDateTimeAdapter
+import com.google.gson.annotations.JsonAdapter
 import java.time.Duration
 import java.time.LocalDateTime
 
@@ -15,12 +17,15 @@
     var status: TaskStatus = TaskStatus.WAITING
 
     //    寮�濮嬫椂闂�
+    @JsonAdapter(LocalDateTimeAdapter::class)
     var startTime: LocalDateTime? = null
 
     //    缁撴潫鏃堕棿
+    @JsonAdapter(LocalDateTimeAdapter::class)
     var endTime: LocalDateTime? = null
 
     //    鍒涘缓鏃堕棿
+    @JsonAdapter(LocalDateTimeAdapter::class)
     var createTime: LocalDateTime = LocalDateTime.now()
 
     //    杩愯鏃堕暱锛堢锛�
diff --git a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatusJsonSerializer.java b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatusJsonSerializer.java
new file mode 100644
index 0000000..6287678
--- /dev/null
+++ b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatusJsonSerializer.java
@@ -0,0 +1,51 @@
+package cn.flightfeather.supervision.common.executor;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import java.lang.reflect.Type;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+/**
+ * BgTaskStatus绫荤殑鑷畾涔夊簭鍒楀寲绫� 瑙e喅浜嗚绠楀睘鎬ф棤娉曞簭鍒楀寲鐨勯棶棰�
+ * by hc 2024.12.10
+ */
+public class BgTaskStatusJsonSerializer implements JsonSerializer<BgTaskStatus> {
+
+    @Override
+    public JsonElement serialize(BgTaskStatus bgTaskStatus, Type typeOfSrc, JsonSerializationContext context) {
+        JsonObject jsonObject = new JsonObject();
+        // 搴忓垪鍖杢ype
+        jsonObject.addProperty("type", String.valueOf(bgTaskStatus.getType()));
+        // 搴忓垪鍖杋d
+        jsonObject.addProperty("id", bgTaskStatus.getId());
+        // 搴忓垪鍖杗ame
+        jsonObject.addProperty("name", bgTaskStatus.getName());
+        // 搴忓垪鍖杝tatus
+        jsonObject.addProperty("status", String.valueOf(bgTaskStatus.getStatus()));
+        // 搴忓垪鍖杝tartTime
+        jsonObject.addProperty("startTime", formatLocalDateTime(bgTaskStatus.getStartTime()));
+        // 搴忓垪鍖杄ndTime
+        jsonObject.addProperty("endTime", formatLocalDateTime(bgTaskStatus.getEndTime()));
+        // 搴忓垪鍖朿reateTime
+        jsonObject.addProperty("createTime", formatLocalDateTime(bgTaskStatus.getCreateTime()));
+        // 搴忓垪鍖� 璁$畻灞炴�unTime
+        jsonObject.addProperty("runTime", bgTaskStatus.getRunTime());
+        // 搴忓垪鍖杄xtra
+        if (bgTaskStatus.getExtra() != null) {
+            jsonObject.add("extra", context.serialize(bgTaskStatus.getExtra()));
+        }
+        return jsonObject;
+    }
+
+    private String formatLocalDateTime(LocalDateTime localDateTime) {
+        // 濡傛灉LocalDateTime涓簄ull锛屽垯杩斿洖null
+        if (localDateTime == null) {
+            return null;
+        }
+        // 鏍煎紡鍖朙ocalDateTime
+        return DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(localDateTime);
+    }
+}
\ No newline at end of file
diff --git a/src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt b/src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt
index d250d1b..deb1ae8 100644
--- a/src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt
+++ b/src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt
@@ -211,6 +211,17 @@
         MULTI_MODE("multi_mode", "澶氶�夋ā寮�"),
     }
 
+    // socket娑堟伅绫诲瀷
+    enum class SocketMessageType(val value: Int, val des: String){
+        BG_TASK(1, "鍚庡彴浠诲姟"),
+        BUSINESS_LOG(2, "涓氬姟鏃ュ織"),
+    }
+
+    // socket蹇冭烦娑堟伅绫诲瀷
+    enum class SocketHeartMessageType(val value: Int, val des: String){
+        HEART_MESSAGE_TYPE(0, "蹇冭烦鏈哄埗")
+    }
+
 
     companion object {
         //闂瀹℃牳
diff --git a/src/main/kotlin/cn/flightfeather/supervision/common/utils/JsonUtil.kt b/src/main/kotlin/cn/flightfeather/supervision/common/utils/JsonUtil.kt
index f749e00..247b3a2 100644
--- a/src/main/kotlin/cn/flightfeather/supervision/common/utils/JsonUtil.kt
+++ b/src/main/kotlin/cn/flightfeather/supervision/common/utils/JsonUtil.kt
@@ -1,9 +1,13 @@
 package cn.flightfeather.supervision.common.utils
 
+import cn.flightfeather.supervision.common.executor.BgTaskStatus
+import cn.flightfeather.supervision.common.executor.BgTaskStatusJsonSerializer
 import com.google.gson.Gson
 import com.google.gson.GsonBuilder
 
 object JsonUtil {
 
-    val gson: Gson = GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create()
+    val gson: Gson = GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss")
+        .registerTypeAdapter(BgTaskStatus::class.java, BgTaskStatusJsonSerializer())
+        .create()
 }
\ No newline at end of file
diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/LocalDateTimeAdapter.java b/src/main/kotlin/cn/flightfeather/supervision/socket/LocalDateTimeAdapter.java
new file mode 100644
index 0000000..dedbf00
--- /dev/null
+++ b/src/main/kotlin/cn/flightfeather/supervision/socket/LocalDateTimeAdapter.java
@@ -0,0 +1,30 @@
+package cn.flightfeather.supervision.socket;
+
+import com.google.gson.*;
+
+import java.lang.reflect.Type;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+/**
+ * LocalDateTime绫诲瀷鐨勬椂闂存牸寮忓簭鍒楀寲鍜屽弽搴忓垪鍖栫被
+ * by hc 2024.12.6
+ */
+public class LocalDateTimeAdapter implements JsonDeserializer<LocalDateTime>, JsonSerializer<LocalDateTime> {
+    private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+    @Override
+    public JsonElement serialize(LocalDateTime src, Type typeOfSrc, JsonSerializationContext context) {
+        return new JsonPrimitive(dateTimeFormatter.format(src));
+    }
+
+    @Override
+    public LocalDateTime deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
+        try {
+            return LocalDateTime.parse(json.getAsString(), dateTimeFormatter);
+        } catch (Exception e) {
+            throw new JsonParseException(e);
+        }
+    }
+}
+
diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessage.java b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessage.java
new file mode 100644
index 0000000..ea3c4f6
--- /dev/null
+++ b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessage.java
@@ -0,0 +1,37 @@
+package cn.flightfeather.supervision.socket;
+
+public class WebSocketMessage {
+    /*
+    * 娑堟伅绫诲瀷
+    * */
+    private int type;
+
+    /*
+    * 娑堟伅鍐呭
+    * */
+    private Object content;
+
+    public WebSocketMessage() {
+    }
+
+    public WebSocketMessage(int type, Object content) {
+        this.type = type;
+        this.content = content;
+    }
+
+    public int getType() {
+        return type;
+    }
+
+    public void setType(int type) {
+        this.type = type;
+    }
+
+    public Object getContent() {
+        return content;
+    }
+
+    public void setContent(Object content) {
+        this.content = content;
+    }
+}
diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageParser.java b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageParser.java
new file mode 100644
index 0000000..e3ba70f
--- /dev/null
+++ b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageParser.java
@@ -0,0 +1,51 @@
+package cn.flightfeather.supervision.socket;
+
+import cn.flightfeather.supervision.common.utils.JsonUtil;
+import org.springframework.util.StringUtils;
+
+public class WebSocketMessageParser {
+    private static final String START_STR = "##";
+    private static final String SPLIT_STR = "&&";
+    private static final String END_STR = "%%";
+
+    /**
+     * 娑堟伅鏍煎紡鏍¢獙
+     * @param message 娑堟伅
+     * @return 鏍煎紡鏄惁鍚堣
+     */
+    private static boolean verificationMessage(String message) {
+        if (StringUtils.isEmpty(message)) {
+            return false;
+        }
+        if (!message.startsWith(START_STR)) {
+            return false;
+        }
+        if (!message.endsWith(END_STR)) {
+            return false;
+        }
+        return true;
+    }
+    /**
+     * 瑙f瀽鍑虹被鍨嬪拰鍐呭
+     * @param message socket娑堟伅涓殑data瀛楁
+     * @return 瑙f瀽缁撴灉锛屽鏋滄牸寮忎笉姝g‘鍒欒繑鍥瀗ull
+     */
+    public static WebSocketMessage decodeMessage(String message) {
+        if (!verificationMessage(message)) {
+            // 鍙戞尌涓�涓笉浼氳澶勭悊鐨勬秷鎭�
+            return new WebSocketMessage(-1, "");
+        }
+        WebSocketMessage webSocketMessage = new WebSocketMessage();
+        String[] parts = message.substring(START_STR.length(), message.length() - END_STR.length()).split(SPLIT_STR);
+        webSocketMessage.setType(Integer.parseInt(parts[0]));
+        webSocketMessage.setContent(JsonUtil.INSTANCE.getGson().fromJson(parts[1], Object.class));
+        return webSocketMessage;
+    }
+    /**
+     * 鐢熸垚鎸囧畾鏍煎紡鐨勬秷鎭瓧绗︿覆
+     * @return 鐢熸垚鐨勬秷鎭瓧绗︿覆
+     */
+    public static String encodeMessage(WebSocketMessage webSocketMessage) {
+        return START_STR + webSocketMessage.getType() + SPLIT_STR + JsonUtil.INSTANCE.getGson().toJson(webSocketMessage.getContent(), webSocketMessage.getContent().getClass()) + END_STR;
+    }
+}
diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSendMessageUtil.java b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSendMessageUtil.java
new file mode 100644
index 0000000..9c3b535
--- /dev/null
+++ b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSendMessageUtil.java
@@ -0,0 +1,18 @@
+package cn.flightfeather.supervision.socket;
+
+import cn.flightfeather.supervision.common.executor.BgTaskStatus;
+import cn.flightfeather.supervision.common.utils.Constant;
+
+public class WebSocketSendMessageUtil {
+
+    /**
+     * 鍙戦�佸悗鍙颁换鍔$殑socket娑堟伅
+     * @param bgTaskStatus 娑堟伅鐨勫唴瀹�
+     */
+    public static void sendBgTaskMessage(BgTaskStatus bgTaskStatus) {
+        WebSocketMessage webSocketMessage = new WebSocketMessage(Constant.SocketMessageType.BG_TASK.getValue(),
+                bgTaskStatus);
+        String message = WebSocketMessageParser.encodeMessage(webSocketMessage);
+        WebSocketSenderHandler.getInstance().broadcast(message);
+    }
+}
diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSenderHandler.java b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSenderHandler.java
new file mode 100644
index 0000000..3ac4f5c
--- /dev/null
+++ b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSenderHandler.java
@@ -0,0 +1,19 @@
+package cn.flightfeather.supervision.socket;
+
+import cn.flightfeather.supervision.socket.processor.WebSocketSender;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+@Component
+public class WebSocketSenderHandler implements ApplicationContextAware {
+    static private ApplicationContext applicationContext;
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        WebSocketSenderHandler.applicationContext = applicationContext;
+    }
+    public static WebSocketSender getInstance() {
+        return applicationContext.getBean(WebSocketSender.class);
+    }
+}
diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt b/src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt
index 49bcb73..bf27c37 100644
--- a/src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt
+++ b/src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt
@@ -1,5 +1,7 @@
 package cn.flightfeather.supervision.socket.config
 
+import cn.flightfeather.supervision.common.utils.Constant
+import cn.flightfeather.supervision.socket.WebSocketMessageParser
 import cn.flightfeather.supervision.socket.WsSessionManager
 import cn.flightfeather.supervision.socket.processor.WebSocketReceiver
 import org.springframework.stereotype.Component
@@ -45,6 +47,10 @@
         val payload = message.payload
         val sessionId = session.attributes["session_id"]
         println("server 鎺ユ敹鍒� $sessionId 鍙戦�佺殑 $payload")
+        if (WebSocketMessageParser.decodeMessage(payload).type ==
+            Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value) {
+            webSocketReceiver.onReceiveHeartMsg(payload, sessionId.toString())
+        }
         webSocketReceiver.onReceiveMsg(payload)
 //        session.sendMessage(TextMessage("server 鍙戦�佺粰 " + sessionId + " 娑堟伅 " + payload + " " + LocalDateTime.now()
 //            .toString()))
diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt b/src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt
index d8c42f9..6f6d837 100644
--- a/src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt
+++ b/src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt
@@ -2,7 +2,11 @@
 
 import cn.flightfeather.supervision.common.log.BizLog
 import cn.flightfeather.supervision.common.log.WorkStreamLogInfo
+import cn.flightfeather.supervision.common.utils.Constant
+import cn.flightfeather.supervision.socket.WebSocketMessage
+import cn.flightfeather.supervision.socket.WebSocketMessageParser
 import org.springframework.stereotype.Component
+import java.time.LocalDateTime
 
 /**
  * webSocket娑堟伅鎺ユ敹绠$悊
@@ -10,7 +14,7 @@
  * @author feiyu02
  */
 @Component
-class WebSocketReceiver(private val bizLog: BizLog) {
+class WebSocketReceiver(private val bizLog: BizLog, private val webSocketSender: WebSocketSender) {
 
     /**
      * 鎺ユ敹娑堟伅澶勭悊
@@ -21,4 +25,13 @@
             bizLog.info(WorkStreamLogInfo("8FAqSPnAA8ry4ExX", "鏈辨寮�", "鍦ㄤ笂娴峰箍鍙戠矇鐓ょ伆鏈夐檺鍏徃鏂板涓�涓棶棰�"))
         }
     }
+
+    /**
+     * 鎺ユ敹蹇冭烦娑堟伅澶勭悊
+     */
+    fun onReceiveHeartMsg(msg: String, sessionId: String) {
+        val content = WebSocketMessageParser.encodeMessage(WebSocketMessage(Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value,
+            LocalDateTime.now()))
+        webSocketSender.sendMsg(content, sessionId)
+    }
 }
\ No newline at end of file

--
Gitblit v1.9.3