1. 新增socket生命周期管理、心跳机制、重连机制 2. 新增socket消息校验、解析、生成 3. 新增socket消息类型枚举类src\enum\socketMessage 3. 新增消息管理类src\socket\MessageManager 4. 新增观察者模式消息的发布订阅机制src\socket\eventBus 5. 修改后台任务页面注册后台任务状态消息并修改状态的逻辑
已修改3个文件
已添加6个文件
324 ■■■■■ 文件已修改
src/api/index.js 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/components/bg-task/FYBgTaskCard.vue 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/enum/socketMessage.js 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main.js 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/FYWebSocket.js 132 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/MessageManager.js 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/eventBus.js 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/index.js 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/socketMessage.js 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/api/index.js
@@ -17,6 +17,9 @@
  // ip2_file = 'https://fyami.com.cn/';
}
// socket
const $socket_base_url = 'ws://192.168.0.150:8080/workstream'
//飞羽监管
const $fysp = axios.create({
  baseURL: ip1,
@@ -106,4 +109,4 @@
  );
});
export { $fysp, $fytz };
export { $fysp, $fytz, $socket_base_url };
src/components/bg-task/FYBgTaskCard.vue
@@ -46,6 +46,8 @@
import { useFetchData } from '@/composables/fetchData';
import bgtaskApi from '@/api/fysp/bgtaskApi';
import { enumBgTask, BG_TASK_TYPE, BG_TASK_STATUS } from '@/enum/bgTask';
import { SOCKET_MESSAGE_TYPE } from '@/enum/socketMessage';
import MessageManager from '@/socket/MessageManager.js';
export default {
  setup() {
@@ -76,7 +78,36 @@
      deep: true
    }
  },
  created() {
    this.registerBgTaskMessage()
  },
  methods: {
    registerBgTaskMessage() {
      MessageManager.register(SOCKET_MESSAGE_TYPE.BACKGROUND_TASK.name, (data) => {
        this.refreshTaskById(data)
      })
    },
    /**
     * åˆ·æ–°ä¸€ä¸ªä»»åŠ¡é€šè¿‡id,如果是新的任务则添加到任务列表taskList中
     * @param data
     */
    refreshTaskById(data) {
      if (!data || data == {}) {
        return;
      }
      let isNewTask = true
      for (let index = 0; index < this.taskList.length; index++) {
          const task = this.taskList[index];
          if (task.id == data.id) {
            this.taskList[index] = data
            isNewTask = false
            break
          }
        }
        if (isNewTask) {
          this.taskList.push(data)
        }
    },
    addTask() {},
    newTestTask() {
      this.fetchData((page, pageSize) => {
src/enum/socketMessage.js
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,29 @@
import { Enum } from './enum';
const SOCKET_MESSAGE_TYPE = Enum({
  BACKGROUND_TASK: {
    id: 1,
    value: '1',
    label: '后台任务',
    name: 'background_task'
  },
  BUSINESS_LOG: { id: 2, value: '2', label: '业务日志', name: 'business_log' }
});
/**
 * æ ¹æ®socket消息类型字段得到这个类型对象
 * @param {*} value socket消息类型
 * @returns å¯¹åº”的消息类型对象
 */
/**
 * é€šè¿‡åç§°æŸ¥æ‰¾æžšä¸¾ç±»
 * @param {String} value
 * @returns
 */
function getByValue(value) {
  for (const type in SOCKET_MESSAGE_TYPE) {
    const typeObj = SOCKET_MESSAGE_TYPE[type]
    if (typeObj.value == value) {
      return typeObj
    }
  }
}
export { SOCKET_MESSAGE_TYPE, getByValue };
src/main.js
@@ -5,6 +5,7 @@
import { router } from './router';
import App from './App.vue';
import timeUtil from './utils/time-util';
import { initSocketClient } from '@/socket/index.js'
// import 'element-plus/dist/index.css';
import './assets/main.css';
@@ -18,6 +19,8 @@
import dayjs from 'dayjs';
import isSameOrAfter from 'dayjs/plugin/isSameOrAfter';
import isSameOrBefore from 'dayjs/plugin/isSameOrBefore';
// socket客户端
initSocketClient()
// echarts
import * as echarts from 'echarts'
src/socket/FYWebSocket.js
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,132 @@
import { getByValue } from '@/enum/socketMessage.js';
import { encodeMessage, decodeMessage } from './socketMessage.js';
import { $socket_base_url } from '@/api/index.js';
class FYWebSocket extends WebSocket {
  constructor() {
    super($socket_base_url);
    return this;
  }
  /**
   * heartBeatConfig å¿ƒè·³è¿žæŽ¥å‚æ•°
   *    time: å¿ƒè·³æ—¶é—´é—´éš”
   *    timeout: å¿ƒè·³è¶…æ—¶é—´éš”
   *    reconnect: æ–­çº¿é‡è¿žæ—¶é—´é—´éš”
   * reconnectFunc æ–­çº¿é‡è¿žå‡½æ•°
   * messageManager æ¶ˆæ¯ç®¡ç†å™¨
   */
  init(reconnectFunc, messageManager) {
    this.onopen = this.openHandler; // è¿žæŽ¥æˆåŠŸåŽçš„å›žè°ƒå‡½æ•°
    this.onclose = this.closeHandler; // è¿žæŽ¥å…³é—­åŽçš„回调 å‡½æ•°
    this.onmessage = this.messageHandler; // æ”¶åˆ°æœåŠ¡å™¨æ•°æ®åŽçš„å›žè°ƒå‡½æ•°
    this.onerror = this.errorHandler; // è¿žæŽ¥å‘生错误的回调方法
    this.heartBeatConfig = {
      time: 30 * 1000,
      timeout: 2 * 1000,
      reconnect: 30 * 1000
    }; // å¿ƒè·³è¿žæŽ¥é…ç½®å‚æ•°
    this.isReconnect = typeof reconnectFunc === 'function'; // è®°å½•是否断线重连
    this.reconnectFunc = reconnectFunc; // æ–­çº¿é‡è¿žå‡½æ•°
    this.messageManager = messageManager; // æ¶ˆæ¯å¤„理管理器
    this.reconnectTimer = null; // è®°å½•断线重连的时间器
    this.startHeartBeatTimer = null; // è®°å½•心跳时间器
    this.webSocketState = false; // è®°å½•socket连接状态 true为已连接
  }
  // èŽ·å–æ¶ˆæ¯
  getMessage({ data }) {
    return decodeMessage(data);
  }
  // å‘送消息
  sendMessage(type, data) {
    return this.send(encodeMessage(type, data));
  }
  // è¿žæŽ¥æˆåŠŸåŽçš„å›žè°ƒå‡½æ•°
  openHandler() {
    console.log('====onopen websocket连接成功====');
    // socket状态设置为连接,做为后面的断线重连的拦截器
    this.webSocketState = true;
    // åˆ¤æ–­æ˜¯å¦å¯åŠ¨å¿ƒè·³æœºåˆ¶
    if (this.heartBeatConfig && this.heartBeatConfig.time) {
      this.startHeartBeat(this.heartBeatConfig.time);
    }
  }
  // æ”¶åˆ°æœåŠ¡å™¨æ•°æ®åŽçš„å›žè°ƒå‡½æ•°
  messageHandler(res) {
    const webSocketMsg = this.getMessage(res);
    if (!(webSocketMsg && webSocketMsg != null && webSocketMsg != {})) {
      return;
    }
    const type = webSocketMsg.type;
    const data = webSocketMsg.data;
    let typeObj = getByValue(type);
    if (type == 0) {
      // å°†è¿žæŽ¥çŠ¶æ€æ›´æ–°ä¸ºåœ¨çº¿
      this.webSocketState = true;
      console.log('====onmessage websocket心跳检测====', data);
    } else {
      // å‘送事件
      this.messageManager.emit(typeObj.name, data);
      console.log(`====onmessage websocket${typeObj.label}====`, data);
    }
  }
  // è¿žæŽ¥å…³é—­åŽçš„回调 å‡½æ•°
  closeHandler() {
    console.log('====onclose websocket关闭连接====');
    // è®¾ç½®socket状态为断线
    this.webSocketState = false;
    // åœ¨æ–­å¼€è¿žæŽ¥æ—¶ æ¸…除心跳时间器和 æ–­å¼€é‡è¿žæ—¶é—´å™¨
    this.startHeartBeatTimer && clearTimeout(this.startHeartBeatTimer);
    this.reconnectTimer && clearTimeout(this.reconnectTimer);
    this.reconnectWebSocket();
  }
  errorHandler() {
    console.log('====onerror websocket连接出错====');
    // è®¾ç½®socket状态为断线
    this.webSocketState = false;
    // é‡æ–°è¿žæŽ¥
    this.reconnectWebSocket();
  }
  // å¿ƒè·³åˆå§‹åŒ–方法 time:心跳间隔
  startHeartBeat(time) {
    this.startHeartBeatTimer = setTimeout(() => {
      // å®¢æˆ·ç«¯æ¯éš”一段时间向服务端发送一个心跳消息
      this.sendMessage('0', Date.now());
      this.waitingServer();
    }, time);
  }
  //在客户端发送消息之后,延时等待服务器响应,通过webSocketState判断是否连线成功
  waitingServer() {
    this.webSocketState = false;
    setTimeout(() => {
      // è¿žçº¿æˆåŠŸçŠ¶æ€ä¸‹ ç»§ç»­å¿ƒè·³æ£€æµ‹
      if (this.webSocketState) {
        this.startHeartBeat(this.heartBeatConfig.time);
        return;
      }
      console.log('websocket å¿ƒè·³æ— å“åº”, å·²ç»å’ŒæœåŠ¡ç«¯æ–­çº¿');
      // é‡æ–°è¿žæŽ¥æ—¶ï¼Œè®°å¾—要先关闭当前连接
      try {
        this.close();
      } catch (error) {
        console.log('websocket å½“前连接已经关闭');
      }
      // // é‡æ–°è¿žæŽ¥
      // this.reconnectWebSocket()
    }, this.heartBeatConfig.timeout);
  }
  // é‡æ–°è¿žæŽ¥
  reconnectWebSocket() {
    // åˆ¤æ–­æ˜¯å¦æ˜¯é‡æ–°è¿žæŽ¥çŠ¶æ€(即被动状态断线),如果是主动断线的不需要重新连接
    if (!this.isReconnect) {
      return;
    }
    // æ ¹æ®ä¼ å…¥çš„æ–­çº¿é‡è¿žæ—¶é—´é—´éš” å»¶æ—¶è¿žæŽ¥
    this.reconnectTimer = setTimeout(() => {
      // è§¦å‘重新连接函数
      console.log('====onerror websocket尝试重连====');
      this.reconnectFunc();
    }, this.heartBeatConfig.reconnect);
  }
}
export { FYWebSocket };
src/socket/MessageManager.js
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,27 @@
import eventBus from './eventBus.js';
class MessageManager {
  constructor() {
    this.messageHandler = eventBus;
  }
  /**
   * æ³¨å†Œäº‹ä»¶
   * @param {*} eventType äº‹ä»¶ç±»åž‹
   * @param {*} handler äº‹ä»¶å¤„理函数
   */
  register(eventType, handler) {
    this.messageHandler.register(eventType, handler);
  }
  /**
   * å‘送事件
   * @param {*} eventType äº‹ä»¶ç±»åž‹
   * @param  {...any} ars äº‹ä»¶å†…容
   */
  emit(eventType, ...ars) {
    this.messageHandler.emit(eventType, ...ars);
  }
}
const instance = new MessageManager();
Object.freeze(instance); // é˜²æ­¢ä¿®æ”¹å®žä¾‹
export default instance;
src/socket/eventBus.js
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,26 @@
// è§‚察者模式
class EventBus {
    constructor() {
      // æ¶ˆæ¯ä¸­å¿ƒï¼Œè®°å½•了所有的事件 ä»¥åŠ äº‹ä»¶å¯¹åº”的处理函数
      this.subs = Object.create(null)
    }
    // æ³¨å†Œæ—¶é—´
    // å‚数:1.事件名称  2.事件处理函数
    register(eventType, handler) {
      this.subs[eventType] = this.subs[eventType] || []
      this.subs[eventType].push(handler)
    }
    // è§¦å‘事件
    // å‚数: 1.事件名称 2.接收的参数
    emit(eventType, ...ars) {
      if(this.subs[eventType]) {
        this.subs[eventType].forEach(handler => {
          handler(...ars)
        })
      }
    }
  }
  export default new EventBus()
src/socket/index.js
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,20 @@
import { FYWebSocket } from '@/socket/FYWebSocket.js';
import MessageManager from '@/socket/MessageManager.js';
let socket = null;
// è¿žæŽ¥websocket
function connectWebSocket() {
  socket = new FYWebSocket();
  socket.init(() => {
    connectWebSocket();
  }, MessageManager);
  return socket;
}
/**
 * åˆå§‹åŒ–socket客户端
 */
function initSocketClient() {
  connectWebSocket();
}
export { initSocketClient };
src/socket/socketMessage.js
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,51 @@
// å¼€å§‹ç¬¦å·å’Œç»“束符号分别为 '##' å’Œ '%%', åˆ†éš”符为 &&
// å¼€å§‹ç¬¦å·
const startStr = '##';
// åˆ†éš”符号
const splitStr = '&&';
// ç»“束符号
const endStr = '%%';
// æ ¡éªŒæ ¼å¼
function verificationMessage(message) {
  if (!message || message == '') {
    return false;
  }
  if (!(typeof message == 'string')) {
    return false;
  }
  if (!message.startsWith(startStr)) {
    return false;
  }
  if (!message.endsWith(endStr)) {
    return false;
  }
  return true;
}
/**
 * è§£æžå‡ºç±»åž‹å’Œå†…容
 * @param {*} message socket消息中的data字段
 * @returns
 */
function decodeMessage(message) {
  if (!verificationMessage(message)) {
    return;
  }
  const parts = message.slice(startStr.length, -endStr.length).split(splitStr);
  const type = parts[0];
  let data = JSON.parse(parts[1]);
  return {
    type: type,
    data: data
  };
}
/**
 * ç”ŸæˆæŒ‡å®šæ ¼å¼çš„æ¶ˆæ¯å­—符串
 * @param {*} type æ¶ˆæ¯ç±»åž‹
 * @param {*} data æ¶ˆæ¯å†…容
 * @returns ç”Ÿæˆçš„æ¶ˆæ¯å­—符串
 */
function encodeMessage(type, data) {
  return `${startStr}${type}${splitStr}${JSON.stringify(data)}${endStr}`;
}
export { verificationMessage, decodeMessage, encodeMessage };