package cn.flightfeather.supervision.websocket
|
|
import cn.flightfeather.supervision.domain.entity.MeetingInfo
|
import cn.flightfeather.supervision.domain.entity.Participant
|
import cn.flightfeather.supervision.domain.entity.VMRoom
|
import cn.flightfeather.supervision.domain.enumeration.ParticipantType
|
import cn.flightfeather.supervision.domain.mapper.UserinfoMapper
|
import cn.flightfeather.supervision.infrastructure.utils.DateUtil
|
import cn.flightfeather.supervision.infrastructure.utils.UUIDGenerator
|
import cn.flightfeather.supervision.lightshare.repository.MeetingParticipantRepository
|
import cn.flightfeather.supervision.lightshare.repository.MeetingRepository
|
import cn.flightfeather.supervision.lightshare.vo.UserStatusVo
|
import com.google.gson.Gson
|
import com.google.gson.JsonParser
|
import org.java_websocket.WebSocket
|
import org.java_websocket.exceptions.InvalidDataException
|
import org.java_websocket.framing.CloseFrame
|
import org.java_websocket.handshake.ClientHandshake
|
import org.springframework.beans.factory.annotation.Autowired
|
import org.springframework.boot.json.GsonJsonParser
|
import org.springframework.stereotype.Component
|
import java.util.*
|
import java.util.concurrent.Executors
|
import java.util.concurrent.ScheduledExecutorService
|
import java.util.concurrent.TimeUnit
|
import javax.annotation.PostConstruct
|
|
/**
|
* @author riku
|
* Date: 2019/11/15
|
*/
|
@Component
|
class Processor {
|
|
companion object {
|
private lateinit var instance:Processor
|
}
|
|
@Autowired
|
lateinit var meetingRepository: MeetingRepository
|
@Autowired
|
lateinit var meetingParticipantRepository: MeetingParticipantRepository
|
|
@Autowired
|
lateinit var userInfoMapper: UserinfoMapper
|
|
//按会议id和虚拟会议室id将各个用户连接进行分类存储
|
private val socketMapByMeetingRoom = mutableMapOf<String, MutableList<WebSocket>>()
|
//用户状态
|
private val userStatusMapByMeetingRoom = mutableMapOf<String, MutableMap<String, UserStatusVo>>()
|
//会议信息,在线人数、签到人数等
|
private val meetingInfoMap = mutableMapOf<String, ServerMsgVo>()
|
|
//按会议id和虚拟会议室id将定时循环任务进行分类存储
|
private val timingTaskMap = mutableMapOf<String, ScheduledExecutorService>()
|
|
//定时内存清理任务
|
private val cleaningTask = Executors.newSingleThreadScheduledExecutor()
|
|
private var server: VMRoomWebSocketServer? = null
|
|
@PostConstruct
|
fun init() {
|
instance = this
|
instance.meetingRepository = this.meetingRepository
|
instance.meetingParticipantRepository = this.meetingParticipantRepository
|
|
cleaningTask.scheduleAtFixedRate({
|
synchronized(socketMapByMeetingRoom) {
|
val emptySocket = mutableListOf<String>()
|
socketMapByMeetingRoom.forEach { (id, list) ->
|
if (list.isEmpty()) {
|
emptySocket.add(id)
|
}
|
}
|
emptySocket.forEach {key ->
|
removeMeeting(key)
|
this.timingTaskMap.remove(key)
|
}
|
}
|
}, 1, 1, TimeUnit.HOURS)
|
}
|
|
fun register(server: VMRoomWebSocketServer, wb: WebSocket?, p1: ClientHandshake?) {
|
if (this.server == null) {
|
this.server = server
|
}
|
p1?.let {chs ->
|
val cookieStr = chs.getFieldValue("Cookie")
|
try {
|
//获取请求中的头部信息,生成标签
|
val token = GsonJsonParser().parseMap(cookieStr)
|
val meetingId = token["meetingId"] as String
|
val roomId = token["roomId"] as String
|
val name = token["userName"] as String
|
val id = token["userId"] as String
|
val device = token["device"] as String
|
|
val attachmentId = "$meetingId-$roomId"
|
val wbAttachmentId = "$meetingId-$roomId;$id-$device"
|
// val deviceType = token["deviceType"] as String
|
//根据标签将用户绑定至会议室
|
wb?.let{ wb ->
|
wb.setAttachment(wbAttachmentId)
|
synchronized(socketMapByMeetingRoom) {
|
if (!socketMapByMeetingRoom.containsKey(attachmentId)) {
|
val meeting = meetingRepository.getMeeting(meetingId)
|
createMeetingList(attachmentId)
|
val isMute = meeting?.miExtension3 == "true"//获取会议设置,自由发言或默认禁言(针对参会人员),主持人和嘉宾默认永远可自由发言
|
meetingInfoMap[attachmentId] = ServerMsgVo(meetingId, 1, mute = isMute, meeting = meeting)
|
}
|
addUser(attachmentId, wb, id, device)
|
//广播加入会议室的用户等信息,广播的信息需要格式化为json结构
|
meetingInfoMap[attachmentId]?.let {
|
val serverMsg = ServerMsgVo(it.meetingId, it.onlineNum, it.registerNum).apply {
|
text = "用户 $name 加入会议室"
|
}
|
this.broadcastToAll(attachmentId, serverMsg)
|
}
|
}
|
}
|
} catch (e: Throwable) {
|
throw InvalidDataException(CloseFrame.POLICY_VALIDATION, "Not accepted!")
|
}
|
}
|
}
|
|
fun exit(wb: WebSocket?) {
|
val attachmentId = (wb?.getAttachment() as String).split(";")[0]
|
synchronized(socketMapByMeetingRoom) {
|
if (socketMapByMeetingRoom.containsKey(attachmentId)) {
|
removeUser(attachmentId, wb)
|
}
|
}
|
}
|
|
fun <T : MeetingMsgVo> newServerMessage(attachmentId: String, meetingId: String, msgType: MsgType, serverMsg: T?): WebSocketMsg<MeetingMsgVo> {
|
|
val onlineNum = meetingInfoMap[attachmentId]?.onlineNum ?: 0
|
val registerNum = meetingInfoMap[attachmentId]?.registerNum ?: 0
|
return WebSocketMsg(msgType.value, listOf(serverMsg
|
?: ServerMsgVo(meetingId, onlineNum, registerNum).apply {
|
uuid = UUIDGenerator.generate16ShortUUID()
|
time = DateUtil.DateToString(Date(), DateUtil.DateStyle.YYYY_MM_DD_HH_MM_SS)
|
userId = "system"
|
userName = "admin"
|
status = MsgStatus.Sended.value
|
})
|
)
|
}
|
|
|
fun <T : MeetingMsgVo> broadcastToOthers(wb: WebSocket?, serverMsg: T?, msgType: MsgType = MsgType.MsgServer) {
|
val attachmentId = getSocketMapId(wb?.getAttachment() as String)
|
val meetingId = getMeetingId(attachmentId)
|
val webSocketMsg = newServerMessage(attachmentId, meetingId, msgType, serverMsg)
|
server?.let {
|
this.broadcast(it, wb, webSocketMsg, msgType)
|
}
|
}
|
|
fun <T : MeetingMsgVo> broadcastToAll(attachmentId: String, serverMsg: T?, msgType: MsgType = MsgType.MsgServer) {
|
val meetingId = getMeetingId(attachmentId)
|
val webSocketMsg = newServerMessage(attachmentId, meetingId, msgType, serverMsg)
|
val text = Gson().toJson(webSocketMsg)
|
server?.broadcast(text, socketMapByMeetingRoom[attachmentId])
|
}
|
|
/**
|
* 广播系统消息
|
* @param isExceptSender 广播消息时是否剔除消息触发者 [wb]
|
*/
|
fun <T : MeetingMsgVo> broadcastServerMsg(server: VMRoomWebSocketServer, wb: WebSocket?, serverMsg: T?,
|
isExceptSender: Boolean, msgType: MsgType = MsgType.MsgServer) {
|
val attachmentId = (wb?.getAttachment() as String).split(";")[0]
|
val meetingInfo = attachmentId.split("-")
|
if (meetingInfo.size == 2) {
|
val onlineNum = meetingInfoMap[attachmentId]?.onlineNum ?: 0
|
val registerNum = meetingInfoMap[attachmentId]?.registerNum ?: 0
|
val webSocketMsg = WebSocketMsg(msgType.value, listOf((serverMsg
|
?: ServerMsgVo(meetingInfo[0], onlineNum, registerNum)).apply {
|
uuid = UUIDGenerator.generate16ShortUUID()
|
time = DateUtil.DateToString(Date(), DateUtil.DateStyle.YYYY_MM_DD_HH_MM_SS)
|
userId = "system"
|
userName = "admin"
|
status = MsgStatus.Sended.value
|
}))
|
|
if (isExceptSender) {
|
this.broadcast(server, wb, webSocketMsg, msgType)
|
} else {
|
val text = Gson().toJson(webSocketMsg)
|
server.broadcast(text, socketMapByMeetingRoom[attachmentId])
|
}
|
}
|
}
|
|
/**
|
* 向发送消息的客户端回复成功接收消息
|
*/
|
fun reply(wb: WebSocket?, msg: WebSocketMsg<MeetingMsgVo>) {
|
msg.msgType = MsgType.MsgConfirm.value
|
msg.msgVoList.forEach {
|
it.status = MsgStatus.Sended.value
|
}
|
val reply = Gson().toJson(msg)
|
wb?.send(reply)
|
}
|
|
/**
|
* 向客户端所在的会议室其他人广播他发送的消息
|
* @param wb 发送者的socket通道
|
* @param msg 消息
|
*/
|
fun <T:MeetingMsgVo> broadcast(server: VMRoomWebSocketServer, wb: WebSocket?, msg: WebSocketMsg<T>, msgType: MsgType) {
|
val attachmentId = (wb?.getAttachment() as String).split(";")[0]
|
val tempMap = mutableListOf<WebSocket>()
|
socketMapByMeetingRoom[attachmentId]?.let {
|
tempMap.addAll(it)
|
}
|
tempMap.remove(wb)
|
msg.msgType = msgType.value
|
val text = Gson().toJson(msg)
|
server.broadcast(text, tempMap)
|
}
|
|
fun save(wb: WebSocket?, msgVoList: List<MeetingMsgVo>) {
|
val attachmentId = (wb?.getAttachment() as String).split(";")[0]
|
val list = mutableListOf<MeetingMsgVo>()
|
msgVoList.forEach {
|
it.status = MsgStatus.Sended.value
|
if (it.mediaType != MediaType.RequestSpeak.value
|
&& it.mediaType != MediaType.Notification.value) {
|
list.add(it)
|
}
|
}
|
attachmentId.split("-").let {
|
instance.meetingRepository.saveMeetingRecords(it[0], it[1], list)
|
}
|
}
|
|
/**
|
* 开启定时发送服务器系统消息
|
*/
|
fun startTimingServerMsg(server: VMRoomWebSocketServer, wb: WebSocket?) {
|
val attachmentId = (wb?.getAttachment() as String).split(";")[0]
|
if (timingTaskMap.containsKey(attachmentId)) {
|
val scheduledExecutorService = timingTaskMap[attachmentId] ?: Executors.newSingleThreadScheduledExecutor()
|
if (scheduledExecutorService?.isShutdown == true || scheduledExecutorService?.isTerminated == true) {
|
scheduledExecutorService.scheduleAtFixedRate({
|
this.broadcastToAll(attachmentId, null)
|
}, 0, 30, TimeUnit.SECONDS)
|
}
|
} else {
|
val scheduledExecutorService = Executors.newSingleThreadScheduledExecutor()
|
scheduledExecutorService.scheduleAtFixedRate({
|
this.broadcastToAll(attachmentId, null)
|
}, 0, 30, TimeUnit.SECONDS)
|
timingTaskMap[attachmentId] = scheduledExecutorService
|
}
|
}
|
|
/**
|
* 关闭定时发送服务器系统消息
|
*/
|
fun stopTimingServerMsg(server: VMRoomWebSocketServer, wb: WebSocket?) {
|
val attachmentId = (wb?.getAttachment() as String).split(";")[0]
|
if (socketMapByMeetingRoom[attachmentId]?.isEmpty() == true) {
|
timingTaskMap[attachmentId]?.shutdown()
|
timingTaskMap.remove(attachmentId)
|
}
|
}
|
|
fun <T:MeetingMsgVo> parseToWebSocketMsg(text: String?, clazz: Class<out T>):WebSocketMsg<T> {
|
val element = JsonParser.parseString(text)
|
val obj = element.asJsonObject
|
val msg = obj.get("msgType").asInt
|
val msgVoList = mutableListOf<T>()
|
obj.getAsJsonArray("msgVoList").forEach {
|
val vo = Gson().fromJson(it, clazz) as T
|
msgVoList.add(vo)
|
}
|
return WebSocketMsg(msg, msgVoList)
|
}
|
|
fun getOnlineUsers(meetingId: String, roomId: String): Map<String, UserStatusVo> {
|
val attachmentId = "$meetingId-$roomId"
|
return userStatusMapByMeetingRoom[attachmentId] ?: emptyMap()
|
}
|
|
/**
|
* 刷新用户信息
|
* @param isDelete 删除参会人员或者新增参会人员
|
*/
|
fun refreshUserInfo(meetingId: String, roomId: String, participant: Participant, isDelete:Boolean) {
|
val attachmentId = "$meetingId-$roomId"
|
if (userStatusMapByMeetingRoom.containsKey(attachmentId)) {
|
val userMap = userStatusMapByMeetingRoom[attachmentId]
|
if (userMap?.containsKey(participant.mpParticipantid) == true) {
|
userMap[participant.mpParticipantid]?.apply {
|
participantType = if (isDelete) {
|
ParticipantType.Others.value.toByte()
|
} else {
|
participant.mpPersontype?.toByte() ?: ParticipantType.Others.value.toByte()
|
}
|
mute = if (isMeetingAdmin(participantType)) false else meetingInfoMap[attachmentId]?.mute
|
?: true
|
|
//当用户进入过会议室,并且当前是在线的,那就广播禁言状态信息
|
if (online) {
|
val attachmentId1 = "$meetingId-$roomId"
|
broadMuteStatus(meetingId, attachmentId1, listOf(this))
|
}
|
}
|
}
|
}
|
}
|
|
/**
|
* 刷新会议信息
|
*/
|
fun refreshMeetingInfo(meeting: MeetingInfo, roomList: List<VMRoom>) {
|
roomList.forEach {
|
val attachmentId = "${meeting.miGuid}-${it.vmrGuid}"
|
meetingInfoMap[attachmentId]?.apply {
|
meeting.miRegisternum?.let {n ->
|
this.registerNum == n
|
this.meeting?.miRegisternum = n
|
}
|
meeting.miExtension3?.let {mute ->
|
this.mute = mute == "true"
|
this.meeting?.miExtension3 = mute
|
|
val l = mutableListOf<UserStatusVo>()
|
userStatusMapByMeetingRoom[attachmentId]?.forEach { map ->
|
if (this.mute) {
|
if (!isMeetingAdmin(map.value.participantType)) {
|
map.value.mute = this.mute
|
l.add(map.value)
|
}
|
} else {
|
map.value.mute = this.mute
|
l.add(map.value)
|
}
|
}
|
broadMuteStatus(meeting.miGuid, attachmentId, l)
|
}
|
}
|
}
|
}
|
|
/**
|
* 设置用户禁言
|
*/
|
fun setUserMute(meetingId: String, roomId: String, userStatusList: List<UserStatusVo>) {
|
userStatusMapByMeetingRoom["$meetingId-$roomId"]?.let { statusMap ->
|
userStatusList.forEach {
|
it.userId?.let { id ->
|
if (statusMap.containsKey(id)) {
|
statusMap[id]?.mute = it.mute
|
}
|
}
|
}
|
}
|
}
|
|
/**
|
* 广播禁言状态
|
*/
|
fun broadMuteStatus(meetingId: String, attachmentId: String, userStatusList: List<UserStatusVo>, userId: String? = null) {
|
val muteStatusMap = hashMapOf<String, Boolean>()
|
val callNameList = mutableListOf<String>()
|
userStatusList.forEach {
|
muteStatusMap[it.userId ?: ""] = it.mute
|
if (it.callName) {
|
callNameList.add(it.userId ?: "")
|
}
|
}
|
|
val personalServerMsgVo = PersonalServerMsgVo(meetingId, muteStatusMap, callNameList).apply {
|
if (userId != null) {
|
this.userId = userId
|
}
|
}
|
|
this.broadcastToAll(attachmentId, personalServerMsgVo, MsgType.MsgPersonal)
|
}
|
|
private fun removeMeeting(key: String) {
|
socketMapByMeetingRoom.remove(key)
|
userStatusMapByMeetingRoom.remove(key)
|
meetingInfoMap.remove(key)
|
}
|
|
private fun removeUser(attachmentId: String, wb: WebSocket) {
|
socketMapByMeetingRoom[attachmentId]?.remove(wb)
|
|
//2020.04.14 修改为:在用户退出会议室后,用户状态不删除,只修改在线状态
|
var exist = false
|
socketMapByMeetingRoom[attachmentId]?.let{
|
//客户端重连后成功后,会将原连接断开,此时,用户在线状态应该不变更
|
for (s in it) {
|
if (s.getAttachment<String>() == wb.getAttachment()) {
|
exist = true
|
break
|
}
|
}
|
}
|
//当客户端只有正常退出时,才修改在线状态
|
if (!exist) {
|
val userId = getUserId(wb.getAttachment<String>())
|
userStatusMapByMeetingRoom[attachmentId]?.get(userId)?.online = false
|
}
|
|
meetingInfoMap[attachmentId]?.onlineNum = socketMapByMeetingRoom[attachmentId]?.size
|
?: meetingInfoMap[attachmentId]?.onlineNum?.minus(1) ?: 0
|
|
meetingInfoMap[attachmentId]?.let {
|
val serverMsg = ServerMsgVo(it.meetingId, it.onlineNum, it.registerNum).apply {
|
}
|
this.broadcastToAll(attachmentId, serverMsg)
|
}
|
}
|
|
private fun createMeetingList(attachmentId: String) {
|
socketMapByMeetingRoom[attachmentId] = mutableListOf()
|
userStatusMapByMeetingRoom[attachmentId] = mutableMapOf()
|
}
|
|
private fun addUser(attachmentId: String, wb: WebSocket, userId: String, device: String) {
|
//注册webSocket时,进行去重处理,一个ip地址只能允许有一个连接
|
val socketList = socketMapByMeetingRoom[attachmentId] ?: mutableListOf()
|
var exist = false
|
for (i in socketList.indices) {
|
if (socketList[i].getAttachment<String>() == wb.getAttachment<String>()) {
|
socketList[i].close(CloseFrame.REFUSE)
|
socketList[i] = wb
|
exist = true
|
break
|
}
|
}
|
if (!exist) {
|
socketMapByMeetingRoom[attachmentId]?.add(wb)
|
}
|
//当用户状态存在时,不做其他更新,只修改在线状态
|
userStatusMapByMeetingRoom[attachmentId]?.let {
|
val l = getMeetingInfo(attachmentId)
|
if (it.containsKey(userId)) {
|
it[userId]?.online = true
|
} else {
|
it[userId] = UserStatusVo().apply {
|
this.userId = userId
|
participantType = when {
|
l.size == 1 -> getParticipantType(l[0], null, userId)
|
l.size >= 2 -> getParticipantType(l[0], l[1], userId)
|
else -> ParticipantType.Others.value.toByte()
|
}
|
mute = if (isMeetingAdmin(participantType)) false else meetingInfoMap[attachmentId]?.mute
|
?: true
|
online = true
|
}
|
}
|
|
broadMuteStatus(l[0], attachmentId, listOf(it[userId] ?: UserStatusVo()))
|
}
|
|
meetingInfoMap[attachmentId]?.apply {
|
socketMapByMeetingRoom[attachmentId]?.let { onlineNum = it.size }
|
registerNum = meeting?.miRegisternum ?: 0
|
}
|
}
|
|
/**
|
* 判断用户是否会议的管理者(主持人或者嘉宾)
|
*/
|
private fun isMeetingAdmin(participantType: Byte): Boolean {
|
return when (participantType) {
|
ParticipantType.Chair.value.toByte(),
|
ParticipantType.Guest.value.toByte() -> true
|
else -> false
|
}
|
}
|
|
private fun getParticipantType(meetingId: String, roomId: String?, userId: String?): Byte {
|
val participant = meetingParticipantRepository.getParticipant(meetingId, roomId, userId ?: "")
|
return getParticipantType(participant?.mpPersontype?.toInt() ?: 99)
|
}
|
|
private fun getParticipantType(value: Int): Byte {
|
return when (value) {
|
ParticipantType.Chair.value -> ParticipantType.Chair.value.toByte()
|
ParticipantType.Guest.value -> ParticipantType.Guest.value.toByte()
|
ParticipantType.Representative.value -> ParticipantType.Representative.value.toByte()
|
ParticipantType.Conferee.value -> ParticipantType.Conferee.value.toByte()
|
ParticipantType.Others.value -> ParticipantType.Others.value.toByte()
|
else -> ParticipantType.Others.value.toByte()
|
}
|
}
|
|
private fun getSocketMapId(wbAttachmentId: String): String {
|
val strList = wbAttachmentId.split(";")
|
return if (strList.isNotEmpty()) {
|
strList[0]
|
} else {
|
""
|
}
|
}
|
|
private fun getMeetingId(attachmentId: String): String {
|
return if (attachmentId.isBlank()) {
|
""
|
} else {
|
attachmentId.split(";")[0].split("-")[0]
|
}
|
}
|
|
private fun getMeetingInfo(wbAttachmentId: String): List<String> {
|
try {
|
return wbAttachmentId.split(";")[0].split("-")
|
} catch (e: Exception) {
|
}
|
return emptyList()
|
}
|
|
private fun getUserId(attachmentId: String): String {
|
try {
|
return attachmentId.split(";")[1].split("-")[0]
|
} catch (e: Exception) {
|
}
|
return ""
|
}
|
}
|