From 1571cd0f137ced4345fa8785e166a29dc31b6ad1 Mon Sep 17 00:00:00 2001
From: feiyu02 <risaku@163.com>
Date: 星期二, 13 五月 2025 17:42:39 +0800
Subject: [PATCH] 1. 新增动态污染溯源的数据异常判断逻辑 2. 新增动态污染溯源websocket连接功能

---
 src/main/kotlin/com/flightfeather/uav/socket/UnderwaySocketServer.kt |   87 ++++++++++++++++++++++++++++++++++---------
 1 files changed, 68 insertions(+), 19 deletions(-)

diff --git a/src/main/kotlin/com/flightfeather/uav/socket/UnderwaySocketServer.kt b/src/main/kotlin/com/flightfeather/uav/socket/UnderwaySocketServer.kt
index 699f09e..1abff63 100644
--- a/src/main/kotlin/com/flightfeather/uav/socket/UnderwaySocketServer.kt
+++ b/src/main/kotlin/com/flightfeather/uav/socket/UnderwaySocketServer.kt
@@ -1,26 +1,46 @@
 package com.flightfeather.uav.socket
 
+import com.flightfeather.uav.socket.handler.ServerHandler
+import com.flightfeather.uav.socket.handler.UnderwayWebSocketServerHandler
+import com.flightfeather.uav.socket.processor.BaseProcessor
 import io.netty.bootstrap.ServerBootstrap
+import io.netty.channel.ChannelHandler
 import io.netty.channel.ChannelInitializer
 import io.netty.channel.ChannelOption
 import io.netty.channel.nio.NioEventLoopGroup
 import io.netty.channel.socket.nio.NioServerSocketChannel
 import io.netty.channel.socket.nio.NioSocketChannel
+import io.netty.handler.codec.LineBasedFrameDecoder
+import io.netty.handler.codec.http.HttpObjectAggregator
+import io.netty.handler.codec.http.HttpServerCodec
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler
 import io.netty.handler.codec.string.StringDecoder
 import io.netty.handler.codec.string.StringEncoder
+import org.springframework.stereotype.Component
 import java.nio.charset.Charset
 
 /*********************************************************************************
  * 璧拌埅鐩戞祴鏁版嵁socket闀胯繛鎺ユ湇鍔$
  * 鐢ㄤ簬鎺ユ敹瑙f瀽璧拌埅鐩戞祴鏁版嵁锛屽墠绔洃娴嬭澶囩洰鍓嶅寘鎷溅杞借蛋鑸�佹棤浜烘満璧拌埅浠ュ強鏃犱汉鑸硅蛋鑸笁绉嶇被鍨�
  * *******************************************************************************/
-class UnderwaySocketServer {
+@Component
+class UnderwaySocketServer(
+    private val underwayWebSocketServerHandler:UnderwayWebSocketServerHandler
+) {
 
     private val bossGroup = NioEventLoopGroup()
     private val workerGroup = NioEventLoopGroup()
 
-    fun startServer(port: Int) {
-        initialize()?.bind(port)?.sync()
+    fun startUnderwayServer(port: Int, processor: BaseProcessor) {
+        underwayServer(processor)?.bind(port)?.sync()
+    }
+
+    fun startElectricServer(port: Int, processor: BaseProcessor) {
+        electricServer(processor)?.bind(port)?.sync()
+    }
+
+    fun startWebSocketServer(port: Int, processor: BaseProcessor) {
+        webSocketServer(processor)?.bind(port)?.sync()
     }
 
     fun stopServer() {
@@ -28,28 +48,57 @@
         workerGroup.shutdownGracefully()
     }
 
-    private fun initialize(): ServerBootstrap? {
-
+    private fun newServer(childHandler: ChannelHandler): ServerBootstrap? {
         try {
             return ServerBootstrap()
-                    .group(bossGroup, workerGroup)
-                    .channel(NioServerSocketChannel::class.java)
-                    .option(ChannelOption.SO_BACKLOG, 128)
-                    .childOption(ChannelOption.SO_KEEPALIVE, true)
-                    .childOption(ChannelOption.TCP_NODELAY, true)
-                    .childHandler(object : ChannelInitializer<NioSocketChannel>() {
-                        override fun initChannel(p0: NioSocketChannel?) {
-                            p0?.pipeline()
-//                                    ?.addLast("decoder", StringDecoder())
-                                    ?.addLast(UAVByteDataDecoder())
-                                    ?.addLast("encoder", StringEncoder(Charset.forName("UTF-8")))
-                                    ?.addLast(ServerHandler())
-                        }
-                    })
+                .group(bossGroup, workerGroup)
+                .channel(NioServerSocketChannel::class.java)
+                .option(ChannelOption.SO_BACKLOG, 128)
+                .childOption(ChannelOption.SO_KEEPALIVE, true)
+                .childOption(ChannelOption.TCP_NODELAY, true)
+                .childHandler(childHandler)
         } catch (e: Throwable) {
             e.printStackTrace()
         }
 
         return null
     }
+
+    /**
+     * 鐢ㄧ數閲忔湇鍔$
+     */
+    private fun electricServer(processor: BaseProcessor): ServerBootstrap? = newServer(object : ChannelInitializer<NioSocketChannel>() {
+        override fun initChannel(p0: NioSocketChannel?) {
+            p0?.pipeline()
+                ?.addLast("lineDecoder", LineBasedFrameDecoder(1024))
+                ?.addLast("stringDecoder", StringDecoder())
+                ?.addLast("encoder", StringEncoder(Charset.forName("UTF-8")))
+                ?.addLast(ServerHandler(processor))
+        }
+    })
+
+    /**
+     * 澶氬弬鏁拌蛋鑸湇鍔$
+     */
+    private fun underwayServer(processor: BaseProcessor):ServerBootstrap? = newServer(object : ChannelInitializer<NioSocketChannel>() {
+        override fun initChannel(p0: NioSocketChannel?) {
+            p0?.pipeline()
+                ?.addLast(UAVByteDataDecoder())
+                ?.addLast("encoder", StringEncoder(Charset.forName("UTF-8")))
+                ?.addLast(ServerHandler(processor))
+        }
+    })
+
+    /**
+     * 澶氬弬鏁拌蛋鑸湇鍔$
+     */
+    private fun webSocketServer(processor: BaseProcessor):ServerBootstrap? = newServer(object : ChannelInitializer<NioSocketChannel>() {
+        override fun initChannel(p0: NioSocketChannel?) {
+            p0?.pipeline()
+                ?.addLast(HttpServerCodec())
+                ?.addLast(HttpObjectAggregator(65535))
+                ?.addLast(WebSocketServerProtocolHandler("/ws"))
+                ?.addLast(underwayWebSocketServerHandler)
+        }
+    })
 }
\ No newline at end of file

--
Gitblit v1.9.3