src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt
@@ -1,7 +1,7 @@ package cn.flightfeather.supervision.common.executor import cn.flightfeather.supervision.common.exception.BizException import cn.flightfeather.supervision.common.utils.SendSocketMessageUtil import cn.flightfeather.supervision.socket.WebSocketSendMessageUtil import org.springframework.stereotype.Component import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors @@ -38,7 +38,10 @@ throw BizException("æ æ³éå¤å建任å¡") } } val t = BgTask(type, id, name, task) val t = BgTask(type, id, name, task) { status -> // åéæ¶æ¯ WebSocketSendMessageUtil.sendBgTaskMessage(status) } taskSet[id] = t return t } @@ -65,10 +68,7 @@ task.ready() task.future = executorService.submit { task.execute() println(task.taskStatus.runTime) SendSocketMessageUtil.sendBgTaskMessage(task.taskStatus) } SendSocketMessageUtil.sendBgTaskMessage(task.taskStatus) return task } } @@ -117,13 +117,11 @@ return if (id != null) { val task = taskMap[id] ?: throw BizException("æ æ³å ³éä»»å¡ï¼ä»»å¡[${id}]ä¸åå¨") task.shutdown() SendSocketMessageUtil.sendBgTaskMessage(task.taskStatus) listOf(task.taskStatus) } else { val res = mutableListOf<BgTaskStatus?>() taskMap.forEach { (t, u) -> u.shutdown() SendSocketMessageUtil.sendBgTaskMessage(u.taskStatus) res.add(u.taskStatus) } res.sortedBy { it?.createTime } src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTask.kt
@@ -11,6 +11,7 @@ val id: String, val name: String, private val task: () -> Boolean, private val onStatusChange: (status: BgTaskStatus) -> Unit ) { var taskStatus = BgTaskStatus(type, id, name) var future: Future<*>? = null @@ -33,11 +34,13 @@ fun success() { taskStatus.status = TaskStatus.SUCCESS complete() onStatusChange(taskStatus) } fun fail() { taskStatus.status = TaskStatus.FAIL complete() onStatusChange(taskStatus) } fun shutdown() { src/main/kotlin/cn/flightfeather/supervision/common/utils/SendSocketMessageUtil.java
ÎļþÒÑɾ³ý src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageDecoder.java
ÎļþÃû´Ó src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageUtil.java ÐÞ¸Ä @@ -3,9 +3,7 @@ import cn.flightfeather.supervision.common.utils.JsonUtil; import org.springframework.util.StringUtils; import java.util.*; public class WebSocketMessageUtil { public class WebSocketMessageDecoder { private static final String START_STR = "##"; private static final String SPLIT_STR = "&&"; private static final String END_STR = "%%"; @@ -34,7 +32,8 @@ */ public static WebSocketMessage decodeMessage(String message) { if (!verificationMessage(message)) { return null; // 忥ä¸ä¸ªä¸ä¼è¢«å¤ççæ¶æ¯ return new WebSocketMessage(-1, ""); } WebSocketMessage webSocketMessage = new WebSocketMessage(); String[] parts = message.substring(START_STR.length(), message.length() - END_STR.length()).split(SPLIT_STR); @@ -47,6 +46,6 @@ * @return çæçæ¶æ¯å符串 */ public static String encodeMessage(WebSocketMessage webSocketMessage) { return START_STR + webSocketMessage.getType() + SPLIT_STR + JsonUtil.INSTANCE.getGson().toJson(webSocketMessage.getContent()) + END_STR; return START_STR + webSocketMessage.getType() + SPLIT_STR + JsonUtil.INSTANCE.getGson().toJson(webSocketMessage.getContent(), webSocketMessage.getContent().getClass()) + END_STR; } } src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSendMessageUtil.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,18 @@ package cn.flightfeather.supervision.socket; import cn.flightfeather.supervision.common.executor.BgTaskStatus; import cn.flightfeather.supervision.common.utils.Constant; public class WebSocketSendMessageUtil { /** * åéåå°ä»»å¡çsocketæ¶æ¯ * @param bgTaskStatus æ¶æ¯çå 容 */ public static void sendBgTaskMessage(BgTaskStatus bgTaskStatus) { WebSocketMessage webSocketMessage = new WebSocketMessage(Constant.SocketMessageType.BG_TASK.getValue(), bgTaskStatus); String message = WebSocketMessageDecoder.encodeMessage(webSocketMessage); WebSocketSenderHandler.getInstance().broadcast(message); } } src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt
@@ -1,7 +1,7 @@ package cn.flightfeather.supervision.socket.config import cn.flightfeather.supervision.common.utils.Constant import cn.flightfeather.supervision.socket.WebSocketMessageUtil import cn.flightfeather.supervision.socket.WebSocketMessageDecoder import cn.flightfeather.supervision.socket.WsSessionManager import cn.flightfeather.supervision.socket.processor.WebSocketReceiver import org.springframework.stereotype.Component @@ -47,7 +47,7 @@ val payload = message.payload val sessionId = session.attributes["session_id"] println("server æ¥æ¶å° $sessionId åéç $payload") if (WebSocketMessageUtil.decodeMessage(payload).type == if (WebSocketMessageDecoder.decodeMessage(payload).type == Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value) { webSocketReceiver.onReceiveHeartMsg(payload, sessionId.toString()) } src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt
@@ -4,7 +4,7 @@ import cn.flightfeather.supervision.common.log.WorkStreamLogInfo import cn.flightfeather.supervision.common.utils.Constant import cn.flightfeather.supervision.socket.WebSocketMessage import cn.flightfeather.supervision.socket.WebSocketMessageUtil import cn.flightfeather.supervision.socket.WebSocketMessageDecoder import org.springframework.stereotype.Component import java.time.LocalDateTime @@ -30,7 +30,7 @@ * æ¥æ¶å¿è·³æ¶æ¯å¤ç */ fun onReceiveHeartMsg(msg: String, sessionId: String) { val content = WebSocketMessageUtil.encodeMessage(WebSocketMessage(Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value, val content = WebSocketMessageDecoder.encodeMessage(WebSocketMessage(Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value, LocalDateTime.now())) webSocketSender.sendMsg(content, sessionId) }