From 22ce3a4c8453b54c2bfe6d582b734195f899f195 Mon Sep 17 00:00:00 2001
From: hcong <1050828145@qq.com>
Date: 星期二, 10 十二月 2024 14:59:50 +0800
Subject: [PATCH] 1. 新增socket生命周期管理、心跳机制、重连机制 2. 新增socket消息校验、解析、生成 3. 新增socket消息类型枚举类src\enum\socketMessage 3. 新增消息管理类src\socket\MessageManager 4. 新增观察者模式消息的发布订阅机制src\socket\eventBus 5. 修改后台任务页面注册后台任务状态消息并修改状态的逻辑

---
 src/socket/FYWebSocket.js               |  132 ++++++++++++++++++++++
 src/socket/index.js                     |   20 +++
 src/components/bg-task/FYBgTaskCard.vue |   31 +++++
 src/socket/MessageManager.js            |   27 ++++
 src/enum/socketMessage.js               |   29 ++++
 src/api/index.js                        |    5 
 src/main.js                             |    3 
 src/socket/eventBus.js                  |   26 ++++
 src/socket/socketMessage.js             |   51 ++++++++
 9 files changed, 323 insertions(+), 1 deletions(-)

diff --git a/src/api/index.js b/src/api/index.js
index 22e0559..3b5d513 100644
--- a/src/api/index.js
+++ b/src/api/index.js
@@ -17,6 +17,9 @@
   // ip2_file = 'https://fyami.com.cn/';
 }
 
+// socket
+const $socket_base_url = 'ws://192.168.0.150:8080/workstream'
+
 //椋炵窘鐩戠
 const $fysp = axios.create({
   baseURL: ip1,
@@ -106,4 +109,4 @@
   );
 });
 
-export { $fysp, $fytz };
+export { $fysp, $fytz, $socket_base_url };
diff --git a/src/components/bg-task/FYBgTaskCard.vue b/src/components/bg-task/FYBgTaskCard.vue
index 5853db3..e26d3dc 100644
--- a/src/components/bg-task/FYBgTaskCard.vue
+++ b/src/components/bg-task/FYBgTaskCard.vue
@@ -46,6 +46,8 @@
 import { useFetchData } from '@/composables/fetchData';
 import bgtaskApi from '@/api/fysp/bgtaskApi';
 import { enumBgTask, BG_TASK_TYPE, BG_TASK_STATUS } from '@/enum/bgTask';
+import { SOCKET_MESSAGE_TYPE } from '@/enum/socketMessage';
+import MessageManager from '@/socket/MessageManager.js';
 
 export default {
   setup() {
@@ -76,7 +78,36 @@
       deep: true
     }
   },
+  created() {
+    this.registerBgTaskMessage()
+  },
   methods: {
+    registerBgTaskMessage() {
+      MessageManager.register(SOCKET_MESSAGE_TYPE.BACKGROUND_TASK.name, (data) => {
+        this.refreshTaskById(data)
+      })
+    },
+    /**
+     * 鍒锋柊涓�涓换鍔¢�氳繃id锛屽鏋滄槸鏂扮殑浠诲姟鍒欐坊鍔犲埌浠诲姟鍒楄〃taskList涓�
+     * @param data 
+     */
+    refreshTaskById(data) {
+      if (!data || data == {}) {
+        return;
+      }
+      let isNewTask = true
+      for (let index = 0; index < this.taskList.length; index++) {
+          const task = this.taskList[index];
+          if (task.id == data.id) {
+            this.taskList[index] = data
+            isNewTask = false
+            break
+          }
+        }
+        if (isNewTask) {
+          this.taskList.push(data)
+        }
+    },
     addTask() {},
     newTestTask() {
       this.fetchData((page, pageSize) => {
diff --git a/src/enum/socketMessage.js b/src/enum/socketMessage.js
new file mode 100644
index 0000000..14ea741
--- /dev/null
+++ b/src/enum/socketMessage.js
@@ -0,0 +1,29 @@
+import { Enum } from './enum';
+const SOCKET_MESSAGE_TYPE = Enum({
+  BACKGROUND_TASK: {
+    id: 1,
+    value: '1',
+    label: '鍚庡彴浠诲姟',
+    name: 'background_task'
+  },
+  BUSINESS_LOG: { id: 2, value: '2', label: '涓氬姟鏃ュ織', name: 'business_log' }
+});
+/**
+ * 鏍规嵁socket娑堟伅绫诲瀷瀛楁寰楀埌杩欎釜绫诲瀷瀵硅薄
+ * @param {*} value socket娑堟伅绫诲瀷
+ * @returns 瀵瑰簲鐨勬秷鎭被鍨嬪璞�
+ */
+/**
+ * 閫氳繃鍚嶇О鏌ユ壘鏋氫妇绫�
+ * @param {String} value
+ * @returns
+ */
+function getByValue(value) {
+  for (const type in SOCKET_MESSAGE_TYPE) {
+    const typeObj = SOCKET_MESSAGE_TYPE[type]
+    if (typeObj.value == value) {
+      return typeObj
+    }
+  }
+}
+export { SOCKET_MESSAGE_TYPE, getByValue };
diff --git a/src/main.js b/src/main.js
index 6f7f655..ec75b91 100644
--- a/src/main.js
+++ b/src/main.js
@@ -5,6 +5,7 @@
 import { router } from './router';
 import App from './App.vue';
 import timeUtil from './utils/time-util';
+import { initSocketClient } from '@/socket/index.js'
 
 // import 'element-plus/dist/index.css';
 import './assets/main.css';
@@ -18,6 +19,8 @@
 import dayjs from 'dayjs';
 import isSameOrAfter from 'dayjs/plugin/isSameOrAfter';
 import isSameOrBefore from 'dayjs/plugin/isSameOrBefore';
+// socket瀹㈡埛绔�
+initSocketClient()
 
 // echarts
 import * as echarts from 'echarts'
diff --git a/src/socket/FYWebSocket.js b/src/socket/FYWebSocket.js
new file mode 100644
index 0000000..9c60b4a
--- /dev/null
+++ b/src/socket/FYWebSocket.js
@@ -0,0 +1,132 @@
+import { getByValue } from '@/enum/socketMessage.js';
+import { encodeMessage, decodeMessage } from './socketMessage.js';
+import { $socket_base_url } from '@/api/index.js';
+class FYWebSocket extends WebSocket {
+  constructor() {
+    super($socket_base_url);
+    return this;
+  }
+  /**
+   * heartBeatConfig 蹇冭烦杩炴帴鍙傛暟
+   *    time: 蹇冭烦鏃堕棿闂撮殧
+   *    timeout: 蹇冭烦瓒呮椂闂撮殧
+   *    reconnect: 鏂嚎閲嶈繛鏃堕棿闂撮殧
+   * reconnectFunc 鏂嚎閲嶈繛鍑芥暟
+   * messageManager 娑堟伅绠$悊鍣�
+   */
+  init(reconnectFunc, messageManager) {
+    this.onopen = this.openHandler; // 杩炴帴鎴愬姛鍚庣殑鍥炶皟鍑芥暟
+    this.onclose = this.closeHandler; // 杩炴帴鍏抽棴鍚庣殑鍥炶皟 鍑芥暟
+    this.onmessage = this.messageHandler; // 鏀跺埌鏈嶅姟鍣ㄦ暟鎹悗鐨勫洖璋冨嚱鏁�
+    this.onerror = this.errorHandler; // 杩炴帴鍙戠敓閿欒鐨勫洖璋冩柟娉�
+    this.heartBeatConfig = {
+      time: 30 * 1000,
+      timeout: 2 * 1000,
+      reconnect: 30 * 1000
+    }; // 蹇冭烦杩炴帴閰嶇疆鍙傛暟
+    this.isReconnect = typeof reconnectFunc === 'function'; // 璁板綍鏄惁鏂嚎閲嶈繛
+    this.reconnectFunc = reconnectFunc; // 鏂嚎閲嶈繛鍑芥暟
+    this.messageManager = messageManager; // 娑堟伅澶勭悊绠$悊鍣�
+    this.reconnectTimer = null; // 璁板綍鏂嚎閲嶈繛鐨勬椂闂村櫒
+    this.startHeartBeatTimer = null; // 璁板綍蹇冭烦鏃堕棿鍣�
+    this.webSocketState = false; // 璁板綍socket杩炴帴鐘舵�� true涓哄凡杩炴帴
+  }
+  // 鑾峰彇娑堟伅
+  getMessage({ data }) {
+    return decodeMessage(data);
+  }
+  // 鍙戦�佹秷鎭�
+  sendMessage(type, data) {
+    return this.send(encodeMessage(type, data));
+  }
+  // 杩炴帴鎴愬姛鍚庣殑鍥炶皟鍑芥暟
+  openHandler() {
+    console.log('====onopen websocket杩炴帴鎴愬姛====');
+    // socket鐘舵�佽缃负杩炴帴锛屽仛涓哄悗闈㈢殑鏂嚎閲嶈繛鐨勬嫤鎴櫒
+    this.webSocketState = true;
+    // 鍒ゆ柇鏄惁鍚姩蹇冭烦鏈哄埗
+    if (this.heartBeatConfig && this.heartBeatConfig.time) {
+      this.startHeartBeat(this.heartBeatConfig.time);
+    }
+  }
+  // 鏀跺埌鏈嶅姟鍣ㄦ暟鎹悗鐨勫洖璋冨嚱鏁�
+  messageHandler(res) {
+    const webSocketMsg = this.getMessage(res);
+    if (!(webSocketMsg && webSocketMsg != null && webSocketMsg != {})) {
+      return;
+    }
+    const type = webSocketMsg.type;
+    const data = webSocketMsg.data;
+    let typeObj = getByValue(type);
+    if (type == 0) {
+      // 灏嗚繛鎺ョ姸鎬佹洿鏂颁负鍦ㄧ嚎
+      this.webSocketState = true;
+      console.log('====onmessage websocket蹇冭烦妫�娴�====', data);
+    } else {
+      // 鍙戦�佷簨浠�
+      this.messageManager.emit(typeObj.name, data);
+      console.log(`====onmessage websocket${typeObj.label}====`, data);
+    }
+  }
+  // 杩炴帴鍏抽棴鍚庣殑鍥炶皟 鍑芥暟
+  closeHandler() {
+    console.log('====onclose websocket鍏抽棴杩炴帴====');
+    // 璁剧疆socket鐘舵�佷负鏂嚎
+    this.webSocketState = false;
+    // 鍦ㄦ柇寮�杩炴帴鏃� 娓呴櫎蹇冭烦鏃堕棿鍣ㄥ拰 鏂紑閲嶈繛鏃堕棿鍣�
+    this.startHeartBeatTimer && clearTimeout(this.startHeartBeatTimer);
+    this.reconnectTimer && clearTimeout(this.reconnectTimer);
+    this.reconnectWebSocket();
+  }
+  errorHandler() {
+    console.log('====onerror websocket杩炴帴鍑洪敊====');
+    // 璁剧疆socket鐘舵�佷负鏂嚎
+    this.webSocketState = false;
+    // 閲嶆柊杩炴帴
+    this.reconnectWebSocket();
+  }
+
+  // 蹇冭烦鍒濆鍖栨柟娉� time锛氬績璺抽棿闅�
+  startHeartBeat(time) {
+    this.startHeartBeatTimer = setTimeout(() => {
+      // 瀹㈡埛绔瘡闅斾竴娈垫椂闂村悜鏈嶅姟绔彂閫佷竴涓績璺虫秷鎭�
+      this.sendMessage('0', Date.now());
+      this.waitingServer();
+    }, time);
+  }
+  //鍦ㄥ鎴风鍙戦�佹秷鎭箣鍚庯紝寤舵椂绛夊緟鏈嶅姟鍣ㄥ搷搴�,閫氳繃webSocketState鍒ゆ柇鏄惁杩炵嚎鎴愬姛
+  waitingServer() {
+    this.webSocketState = false;
+    setTimeout(() => {
+      // 杩炵嚎鎴愬姛鐘舵�佷笅 缁х画蹇冭烦妫�娴�
+      if (this.webSocketState) {
+        this.startHeartBeat(this.heartBeatConfig.time);
+        return;
+      }
+      console.log('websocket 蹇冭烦鏃犲搷搴�, 宸茬粡鍜屾湇鍔$鏂嚎');
+      // 閲嶆柊杩炴帴鏃讹紝璁板緱瑕佸厛鍏抽棴褰撳墠杩炴帴
+      try {
+        this.close();
+      } catch (error) {
+        console.log('websocket 褰撳墠杩炴帴宸茬粡鍏抽棴');
+      }
+      // // 閲嶆柊杩炴帴
+      // this.reconnectWebSocket()
+    }, this.heartBeatConfig.timeout);
+  }
+
+  // 閲嶆柊杩炴帴
+  reconnectWebSocket() {
+    // 鍒ゆ柇鏄惁鏄噸鏂拌繛鎺ョ姸鎬�(鍗宠鍔ㄧ姸鎬佹柇绾�)锛屽鏋滄槸涓诲姩鏂嚎鐨勪笉闇�瑕侀噸鏂拌繛鎺�
+    if (!this.isReconnect) {
+      return;
+    }
+    // 鏍规嵁浼犲叆鐨勬柇绾块噸杩炴椂闂撮棿闅� 寤舵椂杩炴帴
+    this.reconnectTimer = setTimeout(() => {
+      // 瑙﹀彂閲嶆柊杩炴帴鍑芥暟
+      console.log('====onerror websocket灏濊瘯閲嶈繛====');
+      this.reconnectFunc();
+    }, this.heartBeatConfig.reconnect);
+  }
+}
+export { FYWebSocket };
diff --git a/src/socket/MessageManager.js b/src/socket/MessageManager.js
new file mode 100644
index 0000000..7ae5bff
--- /dev/null
+++ b/src/socket/MessageManager.js
@@ -0,0 +1,27 @@
+import eventBus from './eventBus.js';
+
+class MessageManager {
+  constructor() {
+    this.messageHandler = eventBus;
+  }
+  /**
+   * 娉ㄥ唽浜嬩欢
+   * @param {*} eventType 浜嬩欢绫诲瀷
+   * @param {*} handler 浜嬩欢澶勭悊鍑芥暟
+   */
+  register(eventType, handler) {
+    this.messageHandler.register(eventType, handler);
+  }
+
+  /**
+   * 鍙戦�佷簨浠�
+   * @param {*} eventType 浜嬩欢绫诲瀷
+   * @param  {...any} ars 浜嬩欢鍐呭
+   */
+  emit(eventType, ...ars) {
+    this.messageHandler.emit(eventType, ...ars);
+  }
+}
+const instance = new MessageManager();
+Object.freeze(instance); // 闃叉淇敼瀹炰緥
+export default instance;
diff --git a/src/socket/eventBus.js b/src/socket/eventBus.js
new file mode 100644
index 0000000..ba8e855
--- /dev/null
+++ b/src/socket/eventBus.js
@@ -0,0 +1,26 @@
+// 瑙傚療鑰呮ā寮�
+class EventBus {
+    constructor() {
+      // 娑堟伅涓績锛岃褰曚簡鎵�鏈夌殑浜嬩欢 浠ュ強 浜嬩欢瀵瑰簲鐨勫鐞嗗嚱鏁�
+      this.subs = Object.create(null)
+    }
+   
+    // 娉ㄥ唽鏃堕棿
+    // 鍙傛暟锛�1.浜嬩欢鍚嶇О  2.浜嬩欢澶勭悊鍑芥暟
+    register(eventType, handler) {
+      this.subs[eventType] = this.subs[eventType] || []
+      this.subs[eventType].push(handler)
+    }
+   
+    // 瑙﹀彂浜嬩欢
+    // 鍙傛暟锛� 1.浜嬩欢鍚嶇О 2.鎺ユ敹鐨勫弬鏁�
+    emit(eventType, ...ars) {
+      if(this.subs[eventType]) {
+        this.subs[eventType].forEach(handler => {
+          handler(...ars)
+        })
+      }
+    }
+  }
+   
+  export default new EventBus()
\ No newline at end of file
diff --git a/src/socket/index.js b/src/socket/index.js
new file mode 100644
index 0000000..ce75986
--- /dev/null
+++ b/src/socket/index.js
@@ -0,0 +1,20 @@
+import { FYWebSocket } from '@/socket/FYWebSocket.js';
+import MessageManager from '@/socket/MessageManager.js';
+
+let socket = null;
+
+// 杩炴帴websocket
+function connectWebSocket() {
+  socket = new FYWebSocket();
+  socket.init(() => {
+    connectWebSocket();
+  }, MessageManager);
+  return socket;
+}
+/**
+ * 鍒濆鍖杝ocket瀹㈡埛绔�
+ */
+function initSocketClient() {
+  connectWebSocket();
+}
+export { initSocketClient };
diff --git a/src/socket/socketMessage.js b/src/socket/socketMessage.js
new file mode 100644
index 0000000..e3fe02c
--- /dev/null
+++ b/src/socket/socketMessage.js
@@ -0,0 +1,51 @@
+// 寮�濮嬬鍙峰拰缁撴潫绗﹀彿鍒嗗埆涓� '##' 鍜� '%%', 鍒嗛殧绗︿负 &&
+// 寮�濮嬬鍙�
+const startStr = '##';
+// 鍒嗛殧绗﹀彿
+const splitStr = '&&';
+// 缁撴潫绗﹀彿
+const endStr = '%%';
+// 鏍¢獙鏍煎紡
+function verificationMessage(message) {
+  if (!message || message == '') {
+    return false;
+  }
+  if (!(typeof message == 'string')) {
+    return false;
+  }
+  if (!message.startsWith(startStr)) {
+    return false;
+  }
+  if (!message.endsWith(endStr)) {
+    return false;
+  }
+  return true;
+}
+/**
+ * 瑙f瀽鍑虹被鍨嬪拰鍐呭
+ * @param {*} message socket娑堟伅涓殑data瀛楁
+ * @returns 
+ */
+function decodeMessage(message) {
+  if (!verificationMessage(message)) {
+    return;
+  }
+  const parts = message.slice(startStr.length, -endStr.length).split(splitStr);
+  const type = parts[0];
+  let data = JSON.parse(parts[1]);
+  return {
+    type: type,
+    data: data
+  };
+}
+/**
+ * 鐢熸垚鎸囧畾鏍煎紡鐨勬秷鎭瓧绗︿覆
+ * @param {*} type 娑堟伅绫诲瀷
+ * @param {*} data 娑堟伅鍐呭
+ * @returns 鐢熸垚鐨勬秷鎭瓧绗︿覆
+ */
+function encodeMessage(type, data) {
+  return `${startStr}${type}${splitStr}${JSON.stringify(data)}${endStr}`;
+}
+
+export { verificationMessage, decodeMessage, encodeMessage };

--
Gitblit v1.9.3