From 0c59552dc14c9023d4c0a9d57509cce1d5a6d6da Mon Sep 17 00:00:00 2001 From: feiyu02 <risaku@163.com> Date: 星期二, 31 十二月 2024 10:07:45 +0800 Subject: [PATCH] Merge remote-tracking branch 'supervision/hc-socket-1129' --- src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSendMessageUtil.java | 18 +++ src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSenderHandler.java | 19 +++ src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatus.kt | 5 + src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatusJsonSerializer.java | 51 ++++++++++ src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt | 15 ++ src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageParser.java | 51 ++++++++++ src/main/kotlin/cn/flightfeather/supervision/socket/LocalDateTimeAdapter.java | 30 ++++++ src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTask.kt | 14 ++ src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt | 6 + src/main/kotlin/cn/flightfeather/supervision/common/utils/JsonUtil.kt | 6 + src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt | 10 + src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessage.java | 37 +++++++ src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt | 11 ++ 13 files changed, 265 insertions(+), 8 deletions(-) diff --git a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt index c8f23f6..492e15e 100644 --- a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt +++ b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt @@ -1,6 +1,7 @@ package cn.flightfeather.supervision.common.executor import cn.flightfeather.supervision.common.exception.BizException +import cn.flightfeather.supervision.socket.WebSocketSendMessageUtil import org.springframework.stereotype.Component import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors @@ -37,7 +38,10 @@ throw BizException("鏃犳硶閲嶅鍒涘缓浠诲姟") } } - val t = BgTask(type, id, name, task) + val t = BgTask(type, id, name, task) { status -> + // 鍙戦�佹秷鎭� + WebSocketSendMessageUtil.sendBgTaskMessage(status) + } taskSet[id] = t return t } @@ -62,7 +66,9 @@ } } else { task.ready() - task.future = executorService.submit { task.execute() } + task.future = executorService.submit { + task.execute() + } return task } } diff --git a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTask.kt b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTask.kt index 66a9315..3a0b2b1 100644 --- a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTask.kt +++ b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTask.kt @@ -11,13 +11,14 @@ val id: String, val name: String, private val task: () -> Boolean, + private val onStatusChange: (status: BgTaskStatus) -> Unit ) { var taskStatus = BgTaskStatus(type, id, name) var future: Future<*>? = null fun ready() { - taskStatus.status = TaskStatus.RUNNING taskStatus.startTime = LocalDateTime.now() + setStatus(TaskStatus.RUNNING) } fun execute() { @@ -31,24 +32,29 @@ } fun success() { - taskStatus.status = TaskStatus.SUCCESS complete() + setStatus(TaskStatus.SUCCESS) } fun fail() { - taskStatus.status = TaskStatus.FAIL complete() + setStatus(TaskStatus.FAIL) } fun shutdown() { if (future?.isCancelled == false && !future!!.isDone) { future!!.cancel(true) } - taskStatus.status = TaskStatus.SHUTDOWN complete() + setStatus(TaskStatus.SHUTDOWN) } fun complete() { taskStatus.endTime = LocalDateTime.now() } + + fun setStatus(status: TaskStatus) { + taskStatus.status = status + onStatusChange(taskStatus) + } } \ No newline at end of file diff --git a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatus.kt b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatus.kt index 60bfcf4..93d036a 100644 --- a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatus.kt +++ b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatus.kt @@ -1,5 +1,7 @@ package cn.flightfeather.supervision.common.executor +import cn.flightfeather.supervision.socket.LocalDateTimeAdapter +import com.google.gson.annotations.JsonAdapter import java.time.Duration import java.time.LocalDateTime @@ -15,12 +17,15 @@ var status: TaskStatus = TaskStatus.WAITING // 寮�濮嬫椂闂� + @JsonAdapter(LocalDateTimeAdapter::class) var startTime: LocalDateTime? = null // 缁撴潫鏃堕棿 + @JsonAdapter(LocalDateTimeAdapter::class) var endTime: LocalDateTime? = null // 鍒涘缓鏃堕棿 + @JsonAdapter(LocalDateTimeAdapter::class) var createTime: LocalDateTime = LocalDateTime.now() // 杩愯鏃堕暱锛堢锛� diff --git a/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatusJsonSerializer.java b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatusJsonSerializer.java new file mode 100644 index 0000000..6287678 --- /dev/null +++ b/src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatusJsonSerializer.java @@ -0,0 +1,51 @@ +package cn.flightfeather.supervision.common.executor; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; +import java.lang.reflect.Type; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +/** + * BgTaskStatus绫荤殑鑷畾涔夊簭鍒楀寲绫� 瑙e喅浜嗚绠楀睘鎬ф棤娉曞簭鍒楀寲鐨勯棶棰� + * by hc 2024.12.10 + */ +public class BgTaskStatusJsonSerializer implements JsonSerializer<BgTaskStatus> { + + @Override + public JsonElement serialize(BgTaskStatus bgTaskStatus, Type typeOfSrc, JsonSerializationContext context) { + JsonObject jsonObject = new JsonObject(); + // 搴忓垪鍖杢ype + jsonObject.addProperty("type", String.valueOf(bgTaskStatus.getType())); + // 搴忓垪鍖杋d + jsonObject.addProperty("id", bgTaskStatus.getId()); + // 搴忓垪鍖杗ame + jsonObject.addProperty("name", bgTaskStatus.getName()); + // 搴忓垪鍖杝tatus + jsonObject.addProperty("status", String.valueOf(bgTaskStatus.getStatus())); + // 搴忓垪鍖杝tartTime + jsonObject.addProperty("startTime", formatLocalDateTime(bgTaskStatus.getStartTime())); + // 搴忓垪鍖杄ndTime + jsonObject.addProperty("endTime", formatLocalDateTime(bgTaskStatus.getEndTime())); + // 搴忓垪鍖朿reateTime + jsonObject.addProperty("createTime", formatLocalDateTime(bgTaskStatus.getCreateTime())); + // 搴忓垪鍖� 璁$畻灞炴�unTime + jsonObject.addProperty("runTime", bgTaskStatus.getRunTime()); + // 搴忓垪鍖杄xtra + if (bgTaskStatus.getExtra() != null) { + jsonObject.add("extra", context.serialize(bgTaskStatus.getExtra())); + } + return jsonObject; + } + + private String formatLocalDateTime(LocalDateTime localDateTime) { + // 濡傛灉LocalDateTime涓簄ull锛屽垯杩斿洖null + if (localDateTime == null) { + return null; + } + // 鏍煎紡鍖朙ocalDateTime + return DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(localDateTime); + } +} \ No newline at end of file diff --git a/src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt b/src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt index d250d1b..deb1ae8 100644 --- a/src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt +++ b/src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt @@ -211,6 +211,17 @@ MULTI_MODE("multi_mode", "澶氶�夋ā寮�"), } + // socket娑堟伅绫诲瀷 + enum class SocketMessageType(val value: Int, val des: String){ + BG_TASK(1, "鍚庡彴浠诲姟"), + BUSINESS_LOG(2, "涓氬姟鏃ュ織"), + } + + // socket蹇冭烦娑堟伅绫诲瀷 + enum class SocketHeartMessageType(val value: Int, val des: String){ + HEART_MESSAGE_TYPE(0, "蹇冭烦鏈哄埗") + } + companion object { //闂瀹℃牳 diff --git a/src/main/kotlin/cn/flightfeather/supervision/common/utils/JsonUtil.kt b/src/main/kotlin/cn/flightfeather/supervision/common/utils/JsonUtil.kt index f749e00..247b3a2 100644 --- a/src/main/kotlin/cn/flightfeather/supervision/common/utils/JsonUtil.kt +++ b/src/main/kotlin/cn/flightfeather/supervision/common/utils/JsonUtil.kt @@ -1,9 +1,13 @@ package cn.flightfeather.supervision.common.utils +import cn.flightfeather.supervision.common.executor.BgTaskStatus +import cn.flightfeather.supervision.common.executor.BgTaskStatusJsonSerializer import com.google.gson.Gson import com.google.gson.GsonBuilder object JsonUtil { - val gson: Gson = GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create() + val gson: Gson = GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss") + .registerTypeAdapter(BgTaskStatus::class.java, BgTaskStatusJsonSerializer()) + .create() } \ No newline at end of file diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/LocalDateTimeAdapter.java b/src/main/kotlin/cn/flightfeather/supervision/socket/LocalDateTimeAdapter.java new file mode 100644 index 0000000..dedbf00 --- /dev/null +++ b/src/main/kotlin/cn/flightfeather/supervision/socket/LocalDateTimeAdapter.java @@ -0,0 +1,30 @@ +package cn.flightfeather.supervision.socket; + +import com.google.gson.*; + +import java.lang.reflect.Type; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +/** + * LocalDateTime绫诲瀷鐨勬椂闂存牸寮忓簭鍒楀寲鍜屽弽搴忓垪鍖栫被 + * by hc 2024.12.6 + */ +public class LocalDateTimeAdapter implements JsonDeserializer<LocalDateTime>, JsonSerializer<LocalDateTime> { + private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + @Override + public JsonElement serialize(LocalDateTime src, Type typeOfSrc, JsonSerializationContext context) { + return new JsonPrimitive(dateTimeFormatter.format(src)); + } + + @Override + public LocalDateTime deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException { + try { + return LocalDateTime.parse(json.getAsString(), dateTimeFormatter); + } catch (Exception e) { + throw new JsonParseException(e); + } + } +} + diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessage.java b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessage.java new file mode 100644 index 0000000..ea3c4f6 --- /dev/null +++ b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessage.java @@ -0,0 +1,37 @@ +package cn.flightfeather.supervision.socket; + +public class WebSocketMessage { + /* + * 娑堟伅绫诲瀷 + * */ + private int type; + + /* + * 娑堟伅鍐呭 + * */ + private Object content; + + public WebSocketMessage() { + } + + public WebSocketMessage(int type, Object content) { + this.type = type; + this.content = content; + } + + public int getType() { + return type; + } + + public void setType(int type) { + this.type = type; + } + + public Object getContent() { + return content; + } + + public void setContent(Object content) { + this.content = content; + } +} diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageParser.java b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageParser.java new file mode 100644 index 0000000..e3ba70f --- /dev/null +++ b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageParser.java @@ -0,0 +1,51 @@ +package cn.flightfeather.supervision.socket; + +import cn.flightfeather.supervision.common.utils.JsonUtil; +import org.springframework.util.StringUtils; + +public class WebSocketMessageParser { + private static final String START_STR = "##"; + private static final String SPLIT_STR = "&&"; + private static final String END_STR = "%%"; + + /** + * 娑堟伅鏍煎紡鏍¢獙 + * @param message 娑堟伅 + * @return 鏍煎紡鏄惁鍚堣 + */ + private static boolean verificationMessage(String message) { + if (StringUtils.isEmpty(message)) { + return false; + } + if (!message.startsWith(START_STR)) { + return false; + } + if (!message.endsWith(END_STR)) { + return false; + } + return true; + } + /** + * 瑙f瀽鍑虹被鍨嬪拰鍐呭 + * @param message socket娑堟伅涓殑data瀛楁 + * @return 瑙f瀽缁撴灉锛屽鏋滄牸寮忎笉姝g‘鍒欒繑鍥瀗ull + */ + public static WebSocketMessage decodeMessage(String message) { + if (!verificationMessage(message)) { + // 鍙戞尌涓�涓笉浼氳澶勭悊鐨勬秷鎭� + return new WebSocketMessage(-1, ""); + } + WebSocketMessage webSocketMessage = new WebSocketMessage(); + String[] parts = message.substring(START_STR.length(), message.length() - END_STR.length()).split(SPLIT_STR); + webSocketMessage.setType(Integer.parseInt(parts[0])); + webSocketMessage.setContent(JsonUtil.INSTANCE.getGson().fromJson(parts[1], Object.class)); + return webSocketMessage; + } + /** + * 鐢熸垚鎸囧畾鏍煎紡鐨勬秷鎭瓧绗︿覆 + * @return 鐢熸垚鐨勬秷鎭瓧绗︿覆 + */ + public static String encodeMessage(WebSocketMessage webSocketMessage) { + return START_STR + webSocketMessage.getType() + SPLIT_STR + JsonUtil.INSTANCE.getGson().toJson(webSocketMessage.getContent(), webSocketMessage.getContent().getClass()) + END_STR; + } +} diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSendMessageUtil.java b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSendMessageUtil.java new file mode 100644 index 0000000..9c3b535 --- /dev/null +++ b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSendMessageUtil.java @@ -0,0 +1,18 @@ +package cn.flightfeather.supervision.socket; + +import cn.flightfeather.supervision.common.executor.BgTaskStatus; +import cn.flightfeather.supervision.common.utils.Constant; + +public class WebSocketSendMessageUtil { + + /** + * 鍙戦�佸悗鍙颁换鍔$殑socket娑堟伅 + * @param bgTaskStatus 娑堟伅鐨勫唴瀹� + */ + public static void sendBgTaskMessage(BgTaskStatus bgTaskStatus) { + WebSocketMessage webSocketMessage = new WebSocketMessage(Constant.SocketMessageType.BG_TASK.getValue(), + bgTaskStatus); + String message = WebSocketMessageParser.encodeMessage(webSocketMessage); + WebSocketSenderHandler.getInstance().broadcast(message); + } +} diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSenderHandler.java b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSenderHandler.java new file mode 100644 index 0000000..3ac4f5c --- /dev/null +++ b/src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSenderHandler.java @@ -0,0 +1,19 @@ +package cn.flightfeather.supervision.socket; + +import cn.flightfeather.supervision.socket.processor.WebSocketSender; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +@Component +public class WebSocketSenderHandler implements ApplicationContextAware { + static private ApplicationContext applicationContext; + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + WebSocketSenderHandler.applicationContext = applicationContext; + } + public static WebSocketSender getInstance() { + return applicationContext.getBean(WebSocketSender.class); + } +} diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt b/src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt index 49bcb73..bf27c37 100644 --- a/src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt +++ b/src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt @@ -1,5 +1,7 @@ package cn.flightfeather.supervision.socket.config +import cn.flightfeather.supervision.common.utils.Constant +import cn.flightfeather.supervision.socket.WebSocketMessageParser import cn.flightfeather.supervision.socket.WsSessionManager import cn.flightfeather.supervision.socket.processor.WebSocketReceiver import org.springframework.stereotype.Component @@ -45,6 +47,10 @@ val payload = message.payload val sessionId = session.attributes["session_id"] println("server 鎺ユ敹鍒� $sessionId 鍙戦�佺殑 $payload") + if (WebSocketMessageParser.decodeMessage(payload).type == + Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value) { + webSocketReceiver.onReceiveHeartMsg(payload, sessionId.toString()) + } webSocketReceiver.onReceiveMsg(payload) // session.sendMessage(TextMessage("server 鍙戦�佺粰 " + sessionId + " 娑堟伅 " + payload + " " + LocalDateTime.now() // .toString())) diff --git a/src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt b/src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt index d8c42f9..6f6d837 100644 --- a/src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt +++ b/src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt @@ -2,7 +2,11 @@ import cn.flightfeather.supervision.common.log.BizLog import cn.flightfeather.supervision.common.log.WorkStreamLogInfo +import cn.flightfeather.supervision.common.utils.Constant +import cn.flightfeather.supervision.socket.WebSocketMessage +import cn.flightfeather.supervision.socket.WebSocketMessageParser import org.springframework.stereotype.Component +import java.time.LocalDateTime /** * webSocket娑堟伅鎺ユ敹绠$悊 @@ -10,7 +14,7 @@ * @author feiyu02 */ @Component -class WebSocketReceiver(private val bizLog: BizLog) { +class WebSocketReceiver(private val bizLog: BizLog, private val webSocketSender: WebSocketSender) { /** * 鎺ユ敹娑堟伅澶勭悊 @@ -21,4 +25,13 @@ bizLog.info(WorkStreamLogInfo("8FAqSPnAA8ry4ExX", "鏈辨寮�", "鍦ㄤ笂娴峰箍鍙戠矇鐓ょ伆鏈夐檺鍏徃鏂板涓�涓棶棰�")) } } + + /** + * 鎺ユ敹蹇冭烦娑堟伅澶勭悊 + */ + fun onReceiveHeartMsg(msg: String, sessionId: String) { + val content = WebSocketMessageParser.encodeMessage(WebSocketMessage(Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value, + LocalDateTime.now())) + webSocketSender.sendMsg(content, sessionId) + } } \ No newline at end of file -- Gitblit v1.9.3