hcong
2024-12-02 c95f66f4b81e81df9432c0c6d306ad22e3f5e587
socket以及后台任务状态实时刷新
已修改4个文件
已添加4个文件
170 ■■■■■ 文件已修改
src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/common/utils/SendSocketMessageUtil.java 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessage.java 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageUtil.java 52 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSenderHandler.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt
@@ -1,6 +1,7 @@
package cn.flightfeather.supervision.common.executor
import cn.flightfeather.supervision.common.exception.BizException
import cn.flightfeather.supervision.common.utils.SendSocketMessageUtil
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
@@ -62,7 +63,12 @@
            }
        } else {
            task.ready()
            task.future = executorService.submit { task.execute() }
            task.future = executorService.submit {
                task.execute()
                println(task.taskStatus.runTime)
                SendSocketMessageUtil.sendBgTaskMessage(task.taskStatus)
            }
            SendSocketMessageUtil.sendBgTaskMessage(task.taskStatus)
            return task
        }
    }
@@ -111,11 +117,13 @@
        return if (id != null) {
            val task = taskMap[id] ?: throw BizException("无法关闭任务,任务[${id}]不存在")
            task.shutdown()
            SendSocketMessageUtil.sendBgTaskMessage(task.taskStatus)
            listOf(task.taskStatus)
        } else {
            val res = mutableListOf<BgTaskStatus?>()
            taskMap.forEach { (t, u) ->
                u.shutdown()
                SendSocketMessageUtil.sendBgTaskMessage(u.taskStatus)
                res.add(u.taskStatus)
            }
            res.sortedBy { it?.createTime }
src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt
@@ -211,6 +211,17 @@
        MULTI_MODE("multi_mode", "多选模式"),
    }
    // socket消息类型
    enum class SocketMessageType(val value: Int, val des: String){
        BG_TASK(1, "后台任务"),
        BUSINESS_LOG(2, "业务日志"),
    }
    // socket心跳消息类型
    enum class SocketHeartMessageType(val value: Int, val des: String){
        HEART_MESSAGE_TYPE(0, "心跳机制")
    }
    companion object {
        //问题审核
src/main/kotlin/cn/flightfeather/supervision/common/utils/SendSocketMessageUtil.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,20 @@
package cn.flightfeather.supervision.common.utils;
import cn.flightfeather.supervision.common.executor.BgTaskStatus;
import cn.flightfeather.supervision.socket.WebSocketMessage;
import cn.flightfeather.supervision.socket.WebSocketMessageUtil;
import cn.flightfeather.supervision.socket.WebSocketSenderHandler;
public class SendSocketMessageUtil {
    /**
     * å‘送后台任务的socket消息
     * @param bgTaskStatus æ¶ˆæ¯çš„内容
     */
    public static void sendBgTaskMessage(BgTaskStatus bgTaskStatus) {
        WebSocketMessage webSocketMessage = new WebSocketMessage(Constant.SocketMessageType.BG_TASK.getValue(),
                bgTaskStatus);
        String message = WebSocketMessageUtil.encodeMessage(webSocketMessage);
        WebSocketSenderHandler.getInstance().broadcast(message);
    }
}
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessage.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,37 @@
package cn.flightfeather.supervision.socket;
public class WebSocketMessage {
    /*
    * æ¶ˆæ¯ç±»åž‹
    * */
    private int type;
    /*
    * æ¶ˆæ¯å†…容
    * */
    private Object content;
    public WebSocketMessage() {
    }
    public WebSocketMessage(int type, Object content) {
        this.type = type;
        this.content = content;
    }
    public int getType() {
        return type;
    }
    public void setType(int type) {
        this.type = type;
    }
    public Object getContent() {
        return content;
    }
    public void setContent(Object content) {
        this.content = content;
    }
}
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageUtil.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,52 @@
package cn.flightfeather.supervision.socket;
import cn.flightfeather.supervision.common.utils.JsonUtil;
import org.springframework.util.StringUtils;
import java.util.*;
public class WebSocketMessageUtil {
    private static final String START_STR = "##";
    private static final String SPLIT_STR = "&&";
    private static final String END_STR = "%%";
    /**
     * æ¶ˆæ¯æ ¼å¼æ ¡éªŒ
     * @param message æ¶ˆæ¯
     * @return æ ¼å¼æ˜¯å¦åˆè§„
     */
    private static boolean verificationMessage(String message) {
        if (StringUtils.isEmpty(message)) {
            return false;
        }
        if (!message.startsWith(START_STR)) {
            return false;
        }
        if (!message.endsWith(END_STR)) {
            return false;
        }
        return true;
    }
    /**
     * è§£æžå‡ºç±»åž‹å’Œå†…容
     * @param message socket消息中的data字段
     * @return è§£æžç»“果,如果格式不正确则返回null
     */
    public static WebSocketMessage decodeMessage(String message) {
        if (!verificationMessage(message)) {
            return null;
        }
        WebSocketMessage webSocketMessage = new WebSocketMessage();
        String[] parts = message.substring(START_STR.length(), message.length() - END_STR.length()).split(SPLIT_STR);
        webSocketMessage.setType(Integer.parseInt(parts[0]));
        webSocketMessage.setContent(JsonUtil.INSTANCE.getGson().fromJson(parts[1], Object.class));
        return webSocketMessage;
    }
    /**
     * ç”ŸæˆæŒ‡å®šæ ¼å¼çš„æ¶ˆæ¯å­—符串
     * @return ç”Ÿæˆçš„æ¶ˆæ¯å­—符串
     */
    public static String encodeMessage(WebSocketMessage webSocketMessage) {
        return START_STR + webSocketMessage.getType() + SPLIT_STR + JsonUtil.INSTANCE.getGson().toJson(webSocketMessage.getContent()) + END_STR;
    }
}
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSenderHandler.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,19 @@
package cn.flightfeather.supervision.socket;
import cn.flightfeather.supervision.socket.processor.WebSocketSender;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class WebSocketSenderHandler implements ApplicationContextAware {
    static private ApplicationContext applicationContext;
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        WebSocketSenderHandler.applicationContext = applicationContext;
    }
    public static WebSocketSender getInstance() {
        return applicationContext.getBean(WebSocketSender.class);
    }
}
src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt
@@ -1,5 +1,7 @@
package cn.flightfeather.supervision.socket.config
import cn.flightfeather.supervision.common.utils.Constant
import cn.flightfeather.supervision.socket.WebSocketMessageUtil
import cn.flightfeather.supervision.socket.WsSessionManager
import cn.flightfeather.supervision.socket.processor.WebSocketReceiver
import org.springframework.stereotype.Component
@@ -45,6 +47,10 @@
        val payload = message.payload
        val sessionId = session.attributes["session_id"]
        println("server æŽ¥æ”¶åˆ° $sessionId å‘送的 $payload")
        if (WebSocketMessageUtil.decodeMessage(payload).type ==
            Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value) {
            webSocketReceiver.onReceiveHeartMsg(payload, sessionId.toString())
        }
        webSocketReceiver.onReceiveMsg(payload)
//        session.sendMessage(TextMessage("server å‘送给 " + sessionId + " æ¶ˆæ¯ " + payload + " " + LocalDateTime.now()
//            .toString()))
src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt
@@ -2,7 +2,11 @@
import cn.flightfeather.supervision.common.log.BizLog
import cn.flightfeather.supervision.common.log.WorkStreamLogInfo
import cn.flightfeather.supervision.common.utils.Constant
import cn.flightfeather.supervision.socket.WebSocketMessage
import cn.flightfeather.supervision.socket.WebSocketMessageUtil
import org.springframework.stereotype.Component
import java.time.LocalDateTime
/**
 * webSocket消息接收管理
@@ -10,7 +14,7 @@
 * @author feiyu02
 */
@Component
class WebSocketReceiver(private val bizLog: BizLog) {
class WebSocketReceiver(private val bizLog: BizLog, private val webSocketSender: WebSocketSender) {
    /**
     * æŽ¥æ”¶æ¶ˆæ¯å¤„理
@@ -21,4 +25,13 @@
            bizLog.info(WorkStreamLogInfo("8FAqSPnAA8ry4ExX", "朱正强", "在上海广发粉煤灰有限公司新增一个问题"))
        }
    }
    /**
     * æŽ¥æ”¶å¿ƒè·³æ¶ˆæ¯å¤„理
     */
    fun onReceiveHeartMsg(msg: String, sessionId: String) {
        val content = WebSocketMessageUtil.encodeMessage(WebSocketMessage(Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value,
            LocalDateTime.now()))
        webSocketSender.sendMsg(content, sessionId)
    }
}