1. BgTask增加onStatusChange参数,通过外界传递如何发送状态改变的消息
2. BackgroundTaskCtrl.kt 创建了BgTask对象并传递了onStatusChange参数
3. WebSocketMessageUtil 类名修改为 WebSocketMessageDecoder
4. SendWebSocketMessageUtil 类名修改为 WebSocketSendMessageUtil
已修改4个文件
已删除1个文件
已添加1个文件
已重命名1个文件
70 ■■■■ 文件已修改
src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTask.kt 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/common/utils/SendSocketMessageUtil.java 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageDecoder.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSendMessageUtil.java 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt
@@ -1,7 +1,7 @@
package cn.flightfeather.supervision.common.executor
import cn.flightfeather.supervision.common.exception.BizException
import cn.flightfeather.supervision.common.utils.SendSocketMessageUtil
import cn.flightfeather.supervision.socket.WebSocketSendMessageUtil
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
@@ -38,7 +38,10 @@
                throw BizException("无法重复创建任务")
            }
        }
        val t = BgTask(type, id, name, task)
        val t = BgTask(type, id, name, task) { status ->
            // å‘送消息
            WebSocketSendMessageUtil.sendBgTaskMessage(status)
        }
        taskSet[id] = t
        return t
    }
@@ -65,10 +68,7 @@
            task.ready()
            task.future = executorService.submit {
                task.execute()
                println(task.taskStatus.runTime)
                SendSocketMessageUtil.sendBgTaskMessage(task.taskStatus)
            }
            SendSocketMessageUtil.sendBgTaskMessage(task.taskStatus)
            return task
        }
    }
@@ -117,13 +117,11 @@
        return if (id != null) {
            val task = taskMap[id] ?: throw BizException("无法关闭任务,任务[${id}]不存在")
            task.shutdown()
            SendSocketMessageUtil.sendBgTaskMessage(task.taskStatus)
            listOf(task.taskStatus)
        } else {
            val res = mutableListOf<BgTaskStatus?>()
            taskMap.forEach { (t, u) ->
                u.shutdown()
                SendSocketMessageUtil.sendBgTaskMessage(u.taskStatus)
                res.add(u.taskStatus)
            }
            res.sortedBy { it?.createTime }
src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTask.kt
@@ -11,6 +11,7 @@
    val id: String,
    val name: String,
    private val task: () -> Boolean,
    private val onStatusChange: (status: BgTaskStatus) -> Unit
) {
    var taskStatus = BgTaskStatus(type, id, name)
    var future: Future<*>? = null
@@ -33,11 +34,13 @@
    fun success() {
        taskStatus.status = TaskStatus.SUCCESS
        complete()
        onStatusChange(taskStatus)
    }
    fun fail() {
        taskStatus.status = TaskStatus.FAIL
        complete()
        onStatusChange(taskStatus)
    }
    fun shutdown() {
src/main/kotlin/cn/flightfeather/supervision/common/utils/SendSocketMessageUtil.java
ÎļþÒÑɾ³ý
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageDecoder.java
ÎļþÃû´Ó src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageUtil.java ÐÞ¸Ä
@@ -3,9 +3,7 @@
import cn.flightfeather.supervision.common.utils.JsonUtil;
import org.springframework.util.StringUtils;
import java.util.*;
public class WebSocketMessageUtil {
public class WebSocketMessageDecoder {
    private static final String START_STR = "##";
    private static final String SPLIT_STR = "&&";
    private static final String END_STR = "%%";
@@ -34,7 +32,8 @@
     */
    public static WebSocketMessage decodeMessage(String message) {
        if (!verificationMessage(message)) {
            return null;
            // å‘挥一个不会被处理的消息
            return new WebSocketMessage(-1, "");
        }
        WebSocketMessage webSocketMessage = new WebSocketMessage();
        String[] parts = message.substring(START_STR.length(), message.length() - END_STR.length()).split(SPLIT_STR);
@@ -47,6 +46,6 @@
     * @return ç”Ÿæˆçš„æ¶ˆæ¯å­—符串
     */
    public static String encodeMessage(WebSocketMessage webSocketMessage) {
        return START_STR + webSocketMessage.getType() + SPLIT_STR + JsonUtil.INSTANCE.getGson().toJson(webSocketMessage.getContent()) + END_STR;
        return START_STR + webSocketMessage.getType() + SPLIT_STR + JsonUtil.INSTANCE.getGson().toJson(webSocketMessage.getContent(), webSocketMessage.getContent().getClass()) + END_STR;
    }
}
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSendMessageUtil.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,18 @@
package cn.flightfeather.supervision.socket;
import cn.flightfeather.supervision.common.executor.BgTaskStatus;
import cn.flightfeather.supervision.common.utils.Constant;
public class WebSocketSendMessageUtil {
    /**
     * å‘送后台任务的socket消息
     * @param bgTaskStatus æ¶ˆæ¯çš„内容
     */
    public static void sendBgTaskMessage(BgTaskStatus bgTaskStatus) {
        WebSocketMessage webSocketMessage = new WebSocketMessage(Constant.SocketMessageType.BG_TASK.getValue(),
                bgTaskStatus);
        String message = WebSocketMessageDecoder.encodeMessage(webSocketMessage);
        WebSocketSenderHandler.getInstance().broadcast(message);
    }
}
src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt
@@ -1,7 +1,7 @@
package cn.flightfeather.supervision.socket.config
import cn.flightfeather.supervision.common.utils.Constant
import cn.flightfeather.supervision.socket.WebSocketMessageUtil
import cn.flightfeather.supervision.socket.WebSocketMessageDecoder
import cn.flightfeather.supervision.socket.WsSessionManager
import cn.flightfeather.supervision.socket.processor.WebSocketReceiver
import org.springframework.stereotype.Component
@@ -47,7 +47,7 @@
        val payload = message.payload
        val sessionId = session.attributes["session_id"]
        println("server æŽ¥æ”¶åˆ° $sessionId å‘送的 $payload")
        if (WebSocketMessageUtil.decodeMessage(payload).type ==
        if (WebSocketMessageDecoder.decodeMessage(payload).type ==
            Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value) {
            webSocketReceiver.onReceiveHeartMsg(payload, sessionId.toString())
        }
src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt
@@ -4,7 +4,7 @@
import cn.flightfeather.supervision.common.log.WorkStreamLogInfo
import cn.flightfeather.supervision.common.utils.Constant
import cn.flightfeather.supervision.socket.WebSocketMessage
import cn.flightfeather.supervision.socket.WebSocketMessageUtil
import cn.flightfeather.supervision.socket.WebSocketMessageDecoder
import org.springframework.stereotype.Component
import java.time.LocalDateTime
@@ -30,7 +30,7 @@
     * æŽ¥æ”¶å¿ƒè·³æ¶ˆæ¯å¤„理
     */
    fun onReceiveHeartMsg(msg: String, sessionId: String) {
        val content = WebSocketMessageUtil.encodeMessage(WebSocketMessage(Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value,
        val content = WebSocketMessageDecoder.encodeMessage(WebSocketMessage(Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value,
            LocalDateTime.now()))
        webSocketSender.sendMsg(content, sessionId)
    }