feiyu02
2024-12-31 0c59552dc14c9023d4c0a9d57509cce1d5a6d6da
Merge remote-tracking branch 'supervision/hc-socket-1129'
已修改7个文件
已添加6个文件
273 ■■■■■ 文件已修改
src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTask.kt 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatus.kt 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatusJsonSerializer.java 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/common/utils/JsonUtil.kt 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/socket/LocalDateTimeAdapter.java 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessage.java 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageParser.java 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSendMessageUtil.java 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSenderHandler.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/kotlin/cn/flightfeather/supervision/common/executor/BackgroundTaskCtrl.kt
@@ -1,6 +1,7 @@
package cn.flightfeather.supervision.common.executor
import cn.flightfeather.supervision.common.exception.BizException
import cn.flightfeather.supervision.socket.WebSocketSendMessageUtil
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
@@ -37,7 +38,10 @@
                throw BizException("无法重复创建任务")
            }
        }
        val t = BgTask(type, id, name, task)
        val t = BgTask(type, id, name, task) { status ->
            // å‘送消息
            WebSocketSendMessageUtil.sendBgTaskMessage(status)
        }
        taskSet[id] = t
        return t
    }
@@ -62,7 +66,9 @@
            }
        } else {
            task.ready()
            task.future = executorService.submit { task.execute() }
            task.future = executorService.submit {
                task.execute()
            }
            return task
        }
    }
src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTask.kt
@@ -11,13 +11,14 @@
    val id: String,
    val name: String,
    private val task: () -> Boolean,
    private val onStatusChange: (status: BgTaskStatus) -> Unit
) {
    var taskStatus = BgTaskStatus(type, id, name)
    var future: Future<*>? = null
    fun ready() {
        taskStatus.status = TaskStatus.RUNNING
        taskStatus.startTime = LocalDateTime.now()
        setStatus(TaskStatus.RUNNING)
    }
    fun execute() {
@@ -31,24 +32,29 @@
    }
    fun success() {
        taskStatus.status = TaskStatus.SUCCESS
        complete()
        setStatus(TaskStatus.SUCCESS)
    }
    fun fail() {
        taskStatus.status = TaskStatus.FAIL
        complete()
        setStatus(TaskStatus.FAIL)
    }
    fun shutdown() {
        if (future?.isCancelled == false && !future!!.isDone) {
            future!!.cancel(true)
        }
        taskStatus.status = TaskStatus.SHUTDOWN
        complete()
        setStatus(TaskStatus.SHUTDOWN)
    }
    fun complete() {
        taskStatus.endTime = LocalDateTime.now()
    }
    fun setStatus(status: TaskStatus) {
        taskStatus.status = status
        onStatusChange(taskStatus)
    }
}
src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatus.kt
@@ -1,5 +1,7 @@
package cn.flightfeather.supervision.common.executor
import cn.flightfeather.supervision.socket.LocalDateTimeAdapter
import com.google.gson.annotations.JsonAdapter
import java.time.Duration
import java.time.LocalDateTime
@@ -15,12 +17,15 @@
    var status: TaskStatus = TaskStatus.WAITING
    //    å¼€å§‹æ—¶é—´
    @JsonAdapter(LocalDateTimeAdapter::class)
    var startTime: LocalDateTime? = null
    //    ç»“束时间
    @JsonAdapter(LocalDateTimeAdapter::class)
    var endTime: LocalDateTime? = null
    //    åˆ›å»ºæ—¶é—´
    @JsonAdapter(LocalDateTimeAdapter::class)
    var createTime: LocalDateTime = LocalDateTime.now()
    //    è¿è¡Œæ—¶é•¿ï¼ˆç§’)
src/main/kotlin/cn/flightfeather/supervision/common/executor/BgTaskStatusJsonSerializer.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,51 @@
package cn.flightfeather.supervision.common.executor;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import java.lang.reflect.Type;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
 * BgTaskStatus类的自定义序列化类 è§£å†³äº†è®¡ç®—属性无法序列化的问题
 * by hc 2024.12.10
 */
public class BgTaskStatusJsonSerializer implements JsonSerializer<BgTaskStatus> {
    @Override
    public JsonElement serialize(BgTaskStatus bgTaskStatus, Type typeOfSrc, JsonSerializationContext context) {
        JsonObject jsonObject = new JsonObject();
        // åºåˆ—化type
        jsonObject.addProperty("type", String.valueOf(bgTaskStatus.getType()));
        // åºåˆ—化id
        jsonObject.addProperty("id", bgTaskStatus.getId());
        // åºåˆ—化name
        jsonObject.addProperty("name", bgTaskStatus.getName());
        // åºåˆ—化status
        jsonObject.addProperty("status", String.valueOf(bgTaskStatus.getStatus()));
        // åºåˆ—化startTime
        jsonObject.addProperty("startTime", formatLocalDateTime(bgTaskStatus.getStartTime()));
        // åºåˆ—化endTime
        jsonObject.addProperty("endTime", formatLocalDateTime(bgTaskStatus.getEndTime()));
        // åºåˆ—化createTime
        jsonObject.addProperty("createTime", formatLocalDateTime(bgTaskStatus.getCreateTime()));
        // åºåˆ—化 è®¡ç®—属性runTime
        jsonObject.addProperty("runTime", bgTaskStatus.getRunTime());
        // åºåˆ—化extra
        if (bgTaskStatus.getExtra() != null) {
            jsonObject.add("extra", context.serialize(bgTaskStatus.getExtra()));
        }
        return jsonObject;
    }
    private String formatLocalDateTime(LocalDateTime localDateTime) {
        // å¦‚æžœLocalDateTime为null,则返回null
        if (localDateTime == null) {
            return null;
        }
        // æ ¼å¼åŒ–LocalDateTime
        return DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(localDateTime);
    }
}
src/main/kotlin/cn/flightfeather/supervision/common/utils/Constant.kt
@@ -211,6 +211,17 @@
        MULTI_MODE("multi_mode", "多选模式"),
    }
    // socket消息类型
    enum class SocketMessageType(val value: Int, val des: String){
        BG_TASK(1, "后台任务"),
        BUSINESS_LOG(2, "业务日志"),
    }
    // socket心跳消息类型
    enum class SocketHeartMessageType(val value: Int, val des: String){
        HEART_MESSAGE_TYPE(0, "心跳机制")
    }
    companion object {
        //问题审核
src/main/kotlin/cn/flightfeather/supervision/common/utils/JsonUtil.kt
@@ -1,9 +1,13 @@
package cn.flightfeather.supervision.common.utils
import cn.flightfeather.supervision.common.executor.BgTaskStatus
import cn.flightfeather.supervision.common.executor.BgTaskStatusJsonSerializer
import com.google.gson.Gson
import com.google.gson.GsonBuilder
object JsonUtil {
    val gson: Gson = GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create()
    val gson: Gson = GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss")
        .registerTypeAdapter(BgTaskStatus::class.java, BgTaskStatusJsonSerializer())
        .create()
}
src/main/kotlin/cn/flightfeather/supervision/socket/LocalDateTimeAdapter.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,30 @@
package cn.flightfeather.supervision.socket;
import com.google.gson.*;
import java.lang.reflect.Type;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
 * LocalDateTime类型的时间格式序列化和反序列化类
 * by hc 2024.12.6
 */
public class LocalDateTimeAdapter implements JsonDeserializer<LocalDateTime>, JsonSerializer<LocalDateTime> {
    private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    @Override
    public JsonElement serialize(LocalDateTime src, Type typeOfSrc, JsonSerializationContext context) {
        return new JsonPrimitive(dateTimeFormatter.format(src));
    }
    @Override
    public LocalDateTime deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
        try {
            return LocalDateTime.parse(json.getAsString(), dateTimeFormatter);
        } catch (Exception e) {
            throw new JsonParseException(e);
        }
    }
}
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessage.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,37 @@
package cn.flightfeather.supervision.socket;
public class WebSocketMessage {
    /*
    * æ¶ˆæ¯ç±»åž‹
    * */
    private int type;
    /*
    * æ¶ˆæ¯å†…容
    * */
    private Object content;
    public WebSocketMessage() {
    }
    public WebSocketMessage(int type, Object content) {
        this.type = type;
        this.content = content;
    }
    public int getType() {
        return type;
    }
    public void setType(int type) {
        this.type = type;
    }
    public Object getContent() {
        return content;
    }
    public void setContent(Object content) {
        this.content = content;
    }
}
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketMessageParser.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,51 @@
package cn.flightfeather.supervision.socket;
import cn.flightfeather.supervision.common.utils.JsonUtil;
import org.springframework.util.StringUtils;
public class WebSocketMessageParser {
    private static final String START_STR = "##";
    private static final String SPLIT_STR = "&&";
    private static final String END_STR = "%%";
    /**
     * æ¶ˆæ¯æ ¼å¼æ ¡éªŒ
     * @param message æ¶ˆæ¯
     * @return æ ¼å¼æ˜¯å¦åˆè§„
     */
    private static boolean verificationMessage(String message) {
        if (StringUtils.isEmpty(message)) {
            return false;
        }
        if (!message.startsWith(START_STR)) {
            return false;
        }
        if (!message.endsWith(END_STR)) {
            return false;
        }
        return true;
    }
    /**
     * è§£æžå‡ºç±»åž‹å’Œå†…容
     * @param message socket消息中的data字段
     * @return è§£æžç»“果,如果格式不正确则返回null
     */
    public static WebSocketMessage decodeMessage(String message) {
        if (!verificationMessage(message)) {
            // å‘挥一个不会被处理的消息
            return new WebSocketMessage(-1, "");
        }
        WebSocketMessage webSocketMessage = new WebSocketMessage();
        String[] parts = message.substring(START_STR.length(), message.length() - END_STR.length()).split(SPLIT_STR);
        webSocketMessage.setType(Integer.parseInt(parts[0]));
        webSocketMessage.setContent(JsonUtil.INSTANCE.getGson().fromJson(parts[1], Object.class));
        return webSocketMessage;
    }
    /**
     * ç”ŸæˆæŒ‡å®šæ ¼å¼çš„æ¶ˆæ¯å­—符串
     * @return ç”Ÿæˆçš„æ¶ˆæ¯å­—符串
     */
    public static String encodeMessage(WebSocketMessage webSocketMessage) {
        return START_STR + webSocketMessage.getType() + SPLIT_STR + JsonUtil.INSTANCE.getGson().toJson(webSocketMessage.getContent(), webSocketMessage.getContent().getClass()) + END_STR;
    }
}
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSendMessageUtil.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,18 @@
package cn.flightfeather.supervision.socket;
import cn.flightfeather.supervision.common.executor.BgTaskStatus;
import cn.flightfeather.supervision.common.utils.Constant;
public class WebSocketSendMessageUtil {
    /**
     * å‘送后台任务的socket消息
     * @param bgTaskStatus æ¶ˆæ¯çš„内容
     */
    public static void sendBgTaskMessage(BgTaskStatus bgTaskStatus) {
        WebSocketMessage webSocketMessage = new WebSocketMessage(Constant.SocketMessageType.BG_TASK.getValue(),
                bgTaskStatus);
        String message = WebSocketMessageParser.encodeMessage(webSocketMessage);
        WebSocketSenderHandler.getInstance().broadcast(message);
    }
}
src/main/kotlin/cn/flightfeather/supervision/socket/WebSocketSenderHandler.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,19 @@
package cn.flightfeather.supervision.socket;
import cn.flightfeather.supervision.socket.processor.WebSocketSender;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class WebSocketSenderHandler implements ApplicationContextAware {
    static private ApplicationContext applicationContext;
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        WebSocketSenderHandler.applicationContext = applicationContext;
    }
    public static WebSocketSender getInstance() {
        return applicationContext.getBean(WebSocketSender.class);
    }
}
src/main/kotlin/cn/flightfeather/supervision/socket/config/SPTextWebSocketHandler.kt
@@ -1,5 +1,7 @@
package cn.flightfeather.supervision.socket.config
import cn.flightfeather.supervision.common.utils.Constant
import cn.flightfeather.supervision.socket.WebSocketMessageParser
import cn.flightfeather.supervision.socket.WsSessionManager
import cn.flightfeather.supervision.socket.processor.WebSocketReceiver
import org.springframework.stereotype.Component
@@ -45,6 +47,10 @@
        val payload = message.payload
        val sessionId = session.attributes["session_id"]
        println("server æŽ¥æ”¶åˆ° $sessionId å‘送的 $payload")
        if (WebSocketMessageParser.decodeMessage(payload).type ==
            Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value) {
            webSocketReceiver.onReceiveHeartMsg(payload, sessionId.toString())
        }
        webSocketReceiver.onReceiveMsg(payload)
//        session.sendMessage(TextMessage("server å‘送给 " + sessionId + " æ¶ˆæ¯ " + payload + " " + LocalDateTime.now()
//            .toString()))
src/main/kotlin/cn/flightfeather/supervision/socket/processor/WebSocketReceiver.kt
@@ -2,7 +2,11 @@
import cn.flightfeather.supervision.common.log.BizLog
import cn.flightfeather.supervision.common.log.WorkStreamLogInfo
import cn.flightfeather.supervision.common.utils.Constant
import cn.flightfeather.supervision.socket.WebSocketMessage
import cn.flightfeather.supervision.socket.WebSocketMessageParser
import org.springframework.stereotype.Component
import java.time.LocalDateTime
/**
 * webSocket消息接收管理
@@ -10,7 +14,7 @@
 * @author feiyu02
 */
@Component
class WebSocketReceiver(private val bizLog: BizLog) {
class WebSocketReceiver(private val bizLog: BizLog, private val webSocketSender: WebSocketSender) {
    /**
     * æŽ¥æ”¶æ¶ˆæ¯å¤„理
@@ -21,4 +25,13 @@
            bizLog.info(WorkStreamLogInfo("8FAqSPnAA8ry4ExX", "朱正强", "在上海广发粉煤灰有限公司新增一个问题"))
        }
    }
    /**
     * æŽ¥æ”¶å¿ƒè·³æ¶ˆæ¯å¤„理
     */
    fun onReceiveHeartMsg(msg: String, sessionId: String) {
        val content = WebSocketMessageParser.encodeMessage(WebSocketMessage(Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value,
            LocalDateTime.now()))
        webSocketSender.sendMsg(content, sessionId)
    }
}