Merge remote-tracking branch 'supervision/hc-socket-1129'
| | |
| | | package cn.flightfeather.supervision.common.executor |
| | | |
| | | import cn.flightfeather.supervision.common.exception.BizException |
| | | import cn.flightfeather.supervision.socket.WebSocketSendMessageUtil |
| | | import org.springframework.stereotype.Component |
| | | import java.util.concurrent.ConcurrentHashMap |
| | | import java.util.concurrent.Executors |
| | |
| | | throw BizException("æ æ³éå¤å建任å¡") |
| | | } |
| | | } |
| | | val t = BgTask(type, id, name, task) |
| | | val t = BgTask(type, id, name, task) { status -> |
| | | // åéæ¶æ¯ |
| | | WebSocketSendMessageUtil.sendBgTaskMessage(status) |
| | | } |
| | | taskSet[id] = t |
| | | return t |
| | | } |
| | |
| | | } |
| | | } else { |
| | | task.ready() |
| | | task.future = executorService.submit { task.execute() } |
| | | task.future = executorService.submit { |
| | | task.execute() |
| | | } |
| | | return task |
| | | } |
| | | } |
| | |
| | | val id: String, |
| | | val name: String, |
| | | private val task: () -> Boolean, |
| | | private val onStatusChange: (status: BgTaskStatus) -> Unit |
| | | ) { |
| | | var taskStatus = BgTaskStatus(type, id, name) |
| | | var future: Future<*>? = null |
| | | |
| | | fun ready() { |
| | | taskStatus.status = TaskStatus.RUNNING |
| | | taskStatus.startTime = LocalDateTime.now() |
| | | setStatus(TaskStatus.RUNNING) |
| | | } |
| | | |
| | | fun execute() { |
| | |
| | | } |
| | | |
| | | fun success() { |
| | | taskStatus.status = TaskStatus.SUCCESS |
| | | complete() |
| | | setStatus(TaskStatus.SUCCESS) |
| | | } |
| | | |
| | | fun fail() { |
| | | taskStatus.status = TaskStatus.FAIL |
| | | complete() |
| | | setStatus(TaskStatus.FAIL) |
| | | } |
| | | |
| | | fun shutdown() { |
| | | if (future?.isCancelled == false && !future!!.isDone) { |
| | | future!!.cancel(true) |
| | | } |
| | | taskStatus.status = TaskStatus.SHUTDOWN |
| | | complete() |
| | | setStatus(TaskStatus.SHUTDOWN) |
| | | } |
| | | |
| | | fun complete() { |
| | | taskStatus.endTime = LocalDateTime.now() |
| | | } |
| | | |
| | | fun setStatus(status: TaskStatus) { |
| | | taskStatus.status = status |
| | | onStatusChange(taskStatus) |
| | | } |
| | | } |
| | |
| | | package cn.flightfeather.supervision.common.executor |
| | | |
| | | import cn.flightfeather.supervision.socket.LocalDateTimeAdapter |
| | | import com.google.gson.annotations.JsonAdapter |
| | | import java.time.Duration |
| | | import java.time.LocalDateTime |
| | | |
| | |
| | | var status: TaskStatus = TaskStatus.WAITING |
| | | |
| | | // å¼å§æ¶é´ |
| | | @JsonAdapter(LocalDateTimeAdapter::class) |
| | | var startTime: LocalDateTime? = null |
| | | |
| | | // ç»ææ¶é´ |
| | | @JsonAdapter(LocalDateTimeAdapter::class) |
| | | var endTime: LocalDateTime? = null |
| | | |
| | | // å建æ¶é´ |
| | | @JsonAdapter(LocalDateTimeAdapter::class) |
| | | var createTime: LocalDateTime = LocalDateTime.now() |
| | | |
| | | // è¿è¡æ¶é¿ï¼ç§ï¼ |
¶Ô±ÈÐÂÎļþ |
| | |
| | | package cn.flightfeather.supervision.common.executor; |
| | | |
| | | import com.google.gson.JsonElement; |
| | | import com.google.gson.JsonObject; |
| | | import com.google.gson.JsonSerializationContext; |
| | | import com.google.gson.JsonSerializer; |
| | | import java.lang.reflect.Type; |
| | | import java.time.LocalDateTime; |
| | | import java.time.format.DateTimeFormatter; |
| | | |
| | | /** |
| | | * BgTaskStatusç±»çèªå®ä¹åºååç±» è§£å³äºè®¡ç®å±æ§æ æ³åºååçé®é¢ |
| | | * by hc 2024.12.10 |
| | | */ |
| | | public class BgTaskStatusJsonSerializer implements JsonSerializer<BgTaskStatus> { |
| | | |
| | | @Override |
| | | public JsonElement serialize(BgTaskStatus bgTaskStatus, Type typeOfSrc, JsonSerializationContext context) { |
| | | JsonObject jsonObject = new JsonObject(); |
| | | // åºååtype |
| | | jsonObject.addProperty("type", String.valueOf(bgTaskStatus.getType())); |
| | | // åºååid |
| | | jsonObject.addProperty("id", bgTaskStatus.getId()); |
| | | // åºååname |
| | | jsonObject.addProperty("name", bgTaskStatus.getName()); |
| | | // åºååstatus |
| | | jsonObject.addProperty("status", String.valueOf(bgTaskStatus.getStatus())); |
| | | // åºååstartTime |
| | | jsonObject.addProperty("startTime", formatLocalDateTime(bgTaskStatus.getStartTime())); |
| | | // åºååendTime |
| | | jsonObject.addProperty("endTime", formatLocalDateTime(bgTaskStatus.getEndTime())); |
| | | // åºååcreateTime |
| | | jsonObject.addProperty("createTime", formatLocalDateTime(bgTaskStatus.getCreateTime())); |
| | | // åºåå 计ç®å±æ§runTime |
| | | jsonObject.addProperty("runTime", bgTaskStatus.getRunTime()); |
| | | // åºååextra |
| | | if (bgTaskStatus.getExtra() != null) { |
| | | jsonObject.add("extra", context.serialize(bgTaskStatus.getExtra())); |
| | | } |
| | | return jsonObject; |
| | | } |
| | | |
| | | private String formatLocalDateTime(LocalDateTime localDateTime) { |
| | | // 妿LocalDateTime为nullï¼åè¿ånull |
| | | if (localDateTime == null) { |
| | | return null; |
| | | } |
| | | // æ ¼å¼åLocalDateTime |
| | | return DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(localDateTime); |
| | | } |
| | | } |
| | |
| | | MULTI_MODE("multi_mode", "å¤é模å¼"), |
| | | } |
| | | |
| | | // socketæ¶æ¯ç±»å |
| | | enum class SocketMessageType(val value: Int, val des: String){ |
| | | BG_TASK(1, "åå°ä»»å¡"), |
| | | BUSINESS_LOG(2, "ä¸å¡æ¥å¿"), |
| | | } |
| | | |
| | | // socketå¿è·³æ¶æ¯ç±»å |
| | | enum class SocketHeartMessageType(val value: Int, val des: String){ |
| | | HEART_MESSAGE_TYPE(0, "å¿è·³æºå¶") |
| | | } |
| | | |
| | | |
| | | companion object { |
| | | //é®é¢å®¡æ ¸ |
| | |
| | | package cn.flightfeather.supervision.common.utils |
| | | |
| | | import cn.flightfeather.supervision.common.executor.BgTaskStatus |
| | | import cn.flightfeather.supervision.common.executor.BgTaskStatusJsonSerializer |
| | | import com.google.gson.Gson |
| | | import com.google.gson.GsonBuilder |
| | | |
| | | object JsonUtil { |
| | | |
| | | val gson: Gson = GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create() |
| | | val gson: Gson = GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss") |
| | | .registerTypeAdapter(BgTaskStatus::class.java, BgTaskStatusJsonSerializer()) |
| | | .create() |
| | | } |
¶Ô±ÈÐÂÎļþ |
| | |
| | | package cn.flightfeather.supervision.socket; |
| | | |
| | | import com.google.gson.*; |
| | | |
| | | import java.lang.reflect.Type; |
| | | import java.time.LocalDateTime; |
| | | import java.time.format.DateTimeFormatter; |
| | | |
| | | /** |
| | | * LocalDateTimeç±»åçæ¶é´æ ¼å¼åºåååååºååç±» |
| | | * by hc 2024.12.6 |
| | | */ |
| | | public class LocalDateTimeAdapter implements JsonDeserializer<LocalDateTime>, JsonSerializer<LocalDateTime> { |
| | | private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); |
| | | |
| | | @Override |
| | | public JsonElement serialize(LocalDateTime src, Type typeOfSrc, JsonSerializationContext context) { |
| | | return new JsonPrimitive(dateTimeFormatter.format(src)); |
| | | } |
| | | |
| | | @Override |
| | | public LocalDateTime deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException { |
| | | try { |
| | | return LocalDateTime.parse(json.getAsString(), dateTimeFormatter); |
| | | } catch (Exception e) { |
| | | throw new JsonParseException(e); |
| | | } |
| | | } |
| | | } |
| | | |
¶Ô±ÈÐÂÎļþ |
| | |
| | | package cn.flightfeather.supervision.socket; |
| | | |
| | | public class WebSocketMessage { |
| | | /* |
| | | * æ¶æ¯ç±»å |
| | | * */ |
| | | private int type; |
| | | |
| | | /* |
| | | * æ¶æ¯å
容 |
| | | * */ |
| | | private Object content; |
| | | |
| | | public WebSocketMessage() { |
| | | } |
| | | |
| | | public WebSocketMessage(int type, Object content) { |
| | | this.type = type; |
| | | this.content = content; |
| | | } |
| | | |
| | | public int getType() { |
| | | return type; |
| | | } |
| | | |
| | | public void setType(int type) { |
| | | this.type = type; |
| | | } |
| | | |
| | | public Object getContent() { |
| | | return content; |
| | | } |
| | | |
| | | public void setContent(Object content) { |
| | | this.content = content; |
| | | } |
| | | } |
¶Ô±ÈÐÂÎļþ |
| | |
| | | package cn.flightfeather.supervision.socket; |
| | | |
| | | import cn.flightfeather.supervision.common.utils.JsonUtil; |
| | | import org.springframework.util.StringUtils; |
| | | |
| | | public class WebSocketMessageParser { |
| | | private static final String START_STR = "##"; |
| | | private static final String SPLIT_STR = "&&"; |
| | | private static final String END_STR = "%%"; |
| | | |
| | | /** |
| | | * æ¶æ¯æ ¼å¼æ ¡éª |
| | | * @param message æ¶æ¯ |
| | | * @return æ ¼å¼æ¯å¦åè§ |
| | | */ |
| | | private static boolean verificationMessage(String message) { |
| | | if (StringUtils.isEmpty(message)) { |
| | | return false; |
| | | } |
| | | if (!message.startsWith(START_STR)) { |
| | | return false; |
| | | } |
| | | if (!message.endsWith(END_STR)) { |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | /** |
| | | * è§£æåºç±»ååå
容 |
| | | * @param message socketæ¶æ¯ä¸çdataåæ®µ |
| | | * @return è§£æç»æï¼å¦ææ ¼å¼ä¸æ£ç¡®åè¿ånull |
| | | */ |
| | | public static WebSocketMessage decodeMessage(String message) { |
| | | if (!verificationMessage(message)) { |
| | | // 忥ä¸ä¸ªä¸ä¼è¢«å¤ççæ¶æ¯ |
| | | return new WebSocketMessage(-1, ""); |
| | | } |
| | | WebSocketMessage webSocketMessage = new WebSocketMessage(); |
| | | String[] parts = message.substring(START_STR.length(), message.length() - END_STR.length()).split(SPLIT_STR); |
| | | webSocketMessage.setType(Integer.parseInt(parts[0])); |
| | | webSocketMessage.setContent(JsonUtil.INSTANCE.getGson().fromJson(parts[1], Object.class)); |
| | | return webSocketMessage; |
| | | } |
| | | /** |
| | | * çææå®æ ¼å¼çæ¶æ¯å符串 |
| | | * @return çæçæ¶æ¯å符串 |
| | | */ |
| | | public static String encodeMessage(WebSocketMessage webSocketMessage) { |
| | | return START_STR + webSocketMessage.getType() + SPLIT_STR + JsonUtil.INSTANCE.getGson().toJson(webSocketMessage.getContent(), webSocketMessage.getContent().getClass()) + END_STR; |
| | | } |
| | | } |
¶Ô±ÈÐÂÎļþ |
| | |
| | | package cn.flightfeather.supervision.socket; |
| | | |
| | | import cn.flightfeather.supervision.common.executor.BgTaskStatus; |
| | | import cn.flightfeather.supervision.common.utils.Constant; |
| | | |
| | | public class WebSocketSendMessageUtil { |
| | | |
| | | /** |
| | | * åéåå°ä»»å¡çsocketæ¶æ¯ |
| | | * @param bgTaskStatus æ¶æ¯çå
容 |
| | | */ |
| | | public static void sendBgTaskMessage(BgTaskStatus bgTaskStatus) { |
| | | WebSocketMessage webSocketMessage = new WebSocketMessage(Constant.SocketMessageType.BG_TASK.getValue(), |
| | | bgTaskStatus); |
| | | String message = WebSocketMessageParser.encodeMessage(webSocketMessage); |
| | | WebSocketSenderHandler.getInstance().broadcast(message); |
| | | } |
| | | } |
¶Ô±ÈÐÂÎļþ |
| | |
| | | package cn.flightfeather.supervision.socket; |
| | | |
| | | import cn.flightfeather.supervision.socket.processor.WebSocketSender; |
| | | import org.springframework.beans.BeansException; |
| | | import org.springframework.context.ApplicationContext; |
| | | import org.springframework.context.ApplicationContextAware; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | @Component |
| | | public class WebSocketSenderHandler implements ApplicationContextAware { |
| | | static private ApplicationContext applicationContext; |
| | | @Override |
| | | public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { |
| | | WebSocketSenderHandler.applicationContext = applicationContext; |
| | | } |
| | | public static WebSocketSender getInstance() { |
| | | return applicationContext.getBean(WebSocketSender.class); |
| | | } |
| | | } |
| | |
| | | package cn.flightfeather.supervision.socket.config |
| | | |
| | | import cn.flightfeather.supervision.common.utils.Constant |
| | | import cn.flightfeather.supervision.socket.WebSocketMessageParser |
| | | import cn.flightfeather.supervision.socket.WsSessionManager |
| | | import cn.flightfeather.supervision.socket.processor.WebSocketReceiver |
| | | import org.springframework.stereotype.Component |
| | |
| | | val payload = message.payload |
| | | val sessionId = session.attributes["session_id"] |
| | | println("server æ¥æ¶å° $sessionId åéç $payload") |
| | | if (WebSocketMessageParser.decodeMessage(payload).type == |
| | | Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value) { |
| | | webSocketReceiver.onReceiveHeartMsg(payload, sessionId.toString()) |
| | | } |
| | | webSocketReceiver.onReceiveMsg(payload) |
| | | // session.sendMessage(TextMessage("server åéç» " + sessionId + " æ¶æ¯ " + payload + " " + LocalDateTime.now() |
| | | // .toString())) |
| | |
| | | |
| | | import cn.flightfeather.supervision.common.log.BizLog |
| | | import cn.flightfeather.supervision.common.log.WorkStreamLogInfo |
| | | import cn.flightfeather.supervision.common.utils.Constant |
| | | import cn.flightfeather.supervision.socket.WebSocketMessage |
| | | import cn.flightfeather.supervision.socket.WebSocketMessageParser |
| | | import org.springframework.stereotype.Component |
| | | import java.time.LocalDateTime |
| | | |
| | | /** |
| | | * webSocketæ¶æ¯æ¥æ¶ç®¡ç |
| | |
| | | * @author feiyu02 |
| | | */ |
| | | @Component |
| | | class WebSocketReceiver(private val bizLog: BizLog) { |
| | | class WebSocketReceiver(private val bizLog: BizLog, private val webSocketSender: WebSocketSender) { |
| | | |
| | | /** |
| | | * æ¥æ¶æ¶æ¯å¤ç |
| | |
| | | bizLog.info(WorkStreamLogInfo("8FAqSPnAA8ry4ExX", "æ±æ£å¼º", "å¨ä¸æµ·å¹¿åç²ç
¤ç°æéå
¬å¸æ°å¢ä¸ä¸ªé®é¢")) |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * æ¥æ¶å¿è·³æ¶æ¯å¤ç |
| | | */ |
| | | fun onReceiveHeartMsg(msg: String, sessionId: String) { |
| | | val content = WebSocketMessageParser.encodeMessage(WebSocketMessage(Constant.SocketHeartMessageType.HEART_MESSAGE_TYPE.value, |
| | | LocalDateTime.now())) |
| | | webSocketSender.sendMsg(content, sessionId) |
| | | } |
| | | } |