src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/kotlin/cn/flightfeather/supervision/common/utils/SendSocketMessageUtil.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessage.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageUtil.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSenderHandler.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt
@@ -1,6 +1,7 @@ package cn.flightfeather.supervision.common.executor import cn.flightfeather.supervision.common.exception.BizException import cn.flightfeather.supervision.common.utils.SendSocketMessageUtil import org.springframework.stereotype.Component import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors @@ -62,7 +63,12 @@ } } else { task.ready() task.future = executorService.submit { task.execute() } task.future = executorService.submit { task.execute() println(task.taskStatus.runTime) SendSocketMessageUtil.sendBgTaskMessage(task.taskStatus) } SendSocketMessageUtil.sendBgTaskMessage(task.taskStatus) return task } } @@ -111,11 +117,13 @@ return if (id != null) { val task = taskMap[id] ?: throw BizException("æ æ³å ³éä»»å¡ï¼ä»»å¡[${id}]ä¸åå¨") task.shutdown() SendSocketMessageUtil.sendBgTaskMessage(task.taskStatus) listOf(task.taskStatus) } else { val res = mutableListOf<BgTaskStatus?>() taskMap.forEach { (t, u) -> u.shutdown() SendSocketMessageUtil.sendBgTaskMessage(u.taskStatus) res.add(u.taskStatus) } res.sortedBy { it?.createTime } src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt
@@ -211,6 +211,17 @@ MULTI_MODE("multi_mode", "å¤é模å¼"), } // socketæ¶æ¯ç±»å enum class SocketMessageType(val value: Int, val des: String){ BG_TASK(1, "åå°ä»»å¡"), BUSINESS_LOG(2, "ä¸å¡æ¥å¿"), } // socketå¿è·³æ¶æ¯ç±»å enum class SocketHeartMessageType(val value: Int, val des: String){ HEART_MESSAGE_TYPE(0, "å¿è·³æºå¶") } companion object { //é®é¢å®¡æ ¸ src/main/kotlin/cn/flightfeather/supervision/common/utils/SendSocketMessageUtil.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,20 @@ package cn.flightfeather.supervision.common.utils; import cn.flightfeather.supervision.common.executor.BgTaskStatus; import cn.flightfeather.supervision.socket.WebSocketMessage; import cn.flightfeather.supervision.socket.WebSocketMessageUtil; import cn.flightfeather.supervision.socket.WebSocketSenderHandler; public class SendSocketMessageUtil { /** * åéåå°ä»»å¡çsocketæ¶æ¯ * @param bgTaskStatus æ¶æ¯çå 容 */ public static void sendBgTaskMessage(BgTaskStatus bgTaskStatus) { WebSocketMessage webSocketMessage = new WebSocketMessage(Constant.SocketMessageType.BG_TASK.getValue(), bgTaskStatus); String message = WebSocketMessageUtil.encodeMessage(webSocketMessage); WebSocketSenderHandler.getInstance().broadcast(message); } } src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessage.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,37 @@ package cn.flightfeather.supervision.socket; public class WebSocketMessage { /* * æ¶æ¯ç±»å * */ private int type; /* * æ¶æ¯å 容 * */ private Object content; public WebSocketMessage() { } public WebSocketMessage(int type, Object content) { this.type = type; this.content = content; } public int getType() { return type; } public void setType(int type) { this.type = type; } public Object getContent() { return content; } public void setContent(Object content) { this.content = content; } } src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageUtil.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,52 @@ package cn.flightfeather.supervision.socket; import cn.flightfeather.supervision.common.utils.JsonUtil; import org.springframework.util.StringUtils; import java.util.*; public class WebSocketMessageUtil { private static final String START_STR = "##"; private static final String SPLIT_STR = "&&"; private static final String END_STR = "%%"; /** * æ¶æ¯æ ¼å¼æ ¡éª * @param message æ¶æ¯ * @return æ ¼å¼æ¯å¦åè§ */ private static boolean verificationMessage(String message) { if (StringUtils.isEmpty(message)) { return false; } if (!message.startsWith(START_STR)) { return false; } if (!message.endsWith(END_STR)) { return false; } return true; } /** * è§£æåºç±»ååå 容 * @param message socketæ¶æ¯ä¸çdataåæ®µ * @return è§£æç»æï¼å¦ææ ¼å¼ä¸æ£ç¡®åè¿ånull */ public static WebSocketMessage decodeMessage(String message) { if (!verificationMessage(message)) { return null; } WebSocketMessage webSocketMessage = new WebSocketMessage(); String[] parts = message.substring(START_STR.length(), message.length() - END_STR.length()).split(SPLIT_STR); webSocketMessage.setType(Integer.parseInt(parts[0])); webSocketMessage.setContent(JsonUtil.INSTANCE.getGson().fromJson(parts[1], Object.class)); return webSocketMessage; } /** * çææå®æ ¼å¼çæ¶æ¯å符串 * @return çæçæ¶æ¯å符串 */ public static String encodeMessage(WebSocketMessage webSocketMessage) { return START_STR + webSocketMessage.getType() + SPLIT_STR + JsonUtil.INSTANCE.getGson().toJson(webSocketMessage.getContent()) + END_STR; } } src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSenderHandler.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,19 @@ package cn.flightfeather.supervision.socket; import cn.flightfeather.supervision.socket.processor.WebSocketSender; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; @Component public class WebSocketSenderHandler implements ApplicationContextAware { static private ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { WebSocketSenderHandler.applicationContext = applicationContext; } public static WebSocketSender getInstance() { return applicationContext.getBean(WebSocketSender.class); } } src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt
@@ -1,5 +1,7 @@ package cn.flightfeather.supervision.socket.config import cn.flightfeather.supervision.common.utils.Constant import cn.flightfeather.supervision.socket.WebSocketMessageUtil import cn.flightfeather.supervision.socket.WsSessionManager import cn.flightfeather.supervision.socket.processor.WebSocketReceiver import org.springframework.stereotype.Component @@ -45,6 +47,10 @@ val payload = message.payload val sessionId = session.attributes["session_id"] println("server æ¥æ¶å° $sessionId åéç $payload") if (WebSocketMessageUtil.decodeMessage(payload).type == Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value) { webSocketReceiver.onReceiveHeartMsg(payload, sessionId.toString()) } webSocketReceiver.onReceiveMsg(payload) // session.sendMessage(TextMessage("server åéç» " + sessionId + " æ¶æ¯ " + payload + " " + LocalDateTime.now() // .toString())) src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt
@@ -2,7 +2,11 @@ import cn.flightfeather.supervision.common.log.BizLog import cn.flightfeather.supervision.common.log.WorkStreamLogInfo import cn.flightfeather.supervision.common.utils.Constant import cn.flightfeather.supervision.socket.WebSocketMessage import cn.flightfeather.supervision.socket.WebSocketMessageUtil import org.springframework.stereotype.Component import java.time.LocalDateTime /** * webSocketæ¶æ¯æ¥æ¶ç®¡ç @@ -10,7 +14,7 @@ * @author feiyu02 */ @Component class WebSocketReceiver(private val bizLog: BizLog) { class WebSocketReceiver(private val bizLog: BizLog, private val webSocketSender: WebSocketSender) { /** * æ¥æ¶æ¶æ¯å¤ç @@ -21,4 +25,13 @@ bizLog.info(WorkStreamLogInfo("8FAqSPnAA8ry4ExX", "æ±æ£å¼º", "å¨ä¸æµ·å¹¿åç²ç ¤ç°æéå ¬å¸æ°å¢ä¸ä¸ªé®é¢")) } } /** * æ¥æ¶å¿è·³æ¶æ¯å¤ç */ fun onReceiveHeartMsg(msg: String, sessionId: String) { val content = WebSocketMessageUtil.encodeMessage(WebSocketMessage(Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value, LocalDateTime.now())) webSocketSender.sendMsg(content, sessionId) } }