feiyu02
2025-09-12 dc4f12f66685260ac357997680e5f3fe723c3c4a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package cn.flightfeather.supervision.common.executor
 
import cn.flightfeather.supervision.common.exception.BizException
import cn.flightfeather.supervision.socket.WebSocketSendMessageUtil
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import kotlin.jvm.Throws
 
/**
 * 后台耗时任务精确管理
 */
@Component
class BackgroundTaskCtrl {
 
    // 任务列表
    private val taskCollection = ConcurrentHashMap<BgTaskType, ConcurrentHashMap<String, BgTask>>()
    // 线程池
    private val executorService = Executors.newCachedThreadPool()
 
    /**
     * 新增任务
     */
    @Throws(BizException::class)
    fun newTask(type: BgTaskType, id: String, name: String, task: () -> Boolean): BgTask {
        if (!taskCollection.containsKey(type)) {
            taskCollection[type] = ConcurrentHashMap<String, BgTask>()
        }
        val taskSet = taskCollection[type]!!
        // 判断任务id是否存在
        if (taskSet.containsKey(id)) {
            val oldTask = taskSet[id]
            // 当任务已经结束时,直接删除,替换为新任务
            if (oldTask?.taskStatus?.status != TaskStatus.WAITING || oldTask.taskStatus.status != TaskStatus.RUNNING) {
                taskSet.remove(id)
            }
            else {
                throw BizException("无法重复创建任务")
            }
        }
        val t = BgTask(type, id, name, task) { status ->
            // 发送消息
            WebSocketSendMessageUtil.sendBgTaskMessage(status)
        }
        taskSet[id] = t
        return t
    }
 
    /**
     * 开始任务
     */
    @Throws(BizException::class)
    fun startTask(type: BgTaskType, id: String): BgTask {
        val taskSet = taskCollection[type] ?: throw throw BizException("无法开启任务,该任务类型[${type.des}]不存在")
        val t = taskSet[id] ?: throw BizException("无法开启任务,该任务[${id}]不存在")
        return startTask(t)
    }
 
    @Throws(BizException::class)
    fun startTask(task: BgTask): BgTask {
        if (task.taskStatus.status != TaskStatus.WAITING) {
            if (task.taskStatus.status == TaskStatus.RUNNING) {
                throw BizException("无法开启任务,任务[${task.name}]正在执行")
            } else {
                throw BizException("无法开启任务,任务[${task.name}]已结束")
            }
        } else {
            task.ready()
            task.future = executorService.submit {
                task.execute()
            }
            return task
        }
    }
 
    /**
     * 新增并开始任务
     */
    @Throws(BizException::class)
    fun startNewTask(type: BgTaskType, id: String, name: String, task: () -> Boolean): BgTask {
        val t = newTask(type, id, name, task)
        return startTask(t)
    }
 
    /**
     * 获取任务状态
     */
    fun getTaskStatus(type: BgTaskType?, id: String? = null): List<BgTaskStatus?> {
        if (type != null) {
            val set = taskCollection[type] ?: return emptyList()
            return if (id == null) {
                set.toList().map {
                    it.second.taskStatus
                }.sortedBy { it.createTime }
            } else {
                val s = set[id]?.taskStatus
                listOf(s)
            }
        } else {
            val res = mutableListOf<BgTaskStatus?>()
            taskCollection.forEach { (t, u) ->
                val statusList = u.toList().map {
                    it.second.taskStatus
                }
                res.addAll(statusList)
            }
            return res.sortedBy { it?.createTime }
        }
    }
 
    /**
     * 强制关闭任务
     */
    @Throws(BizException::class)
    fun shutDownTask(type: BgTaskType, id: String?): List<BgTaskStatus?> {
        val taskMap = taskCollection[type] ?: throw BizException("无法关闭任务,任务类型[${type.des}]未创建")
        return if (id != null) {
            val task = taskMap[id] ?: throw BizException("无法关闭任务,任务[${id}]不存在")
            task.shutdown()
            listOf(task.taskStatus)
        } else {
            val res = mutableListOf<BgTaskStatus?>()
            taskMap.forEach { (t, u) ->
                u.shutdown()
                res.add(u.taskStatus)
            }
            res.sortedBy { it?.createTime }
        }
    }
 
    @Throws(BizException::class)
    fun removeTask(type: BgTaskType, id: String): Boolean {
        val statusList = shutDownTask(type, id)
        if (statusList.isNotEmpty()) {
            val s = statusList.first() ?: throw BizException("无法移除任务,任务不存在")
            taskCollection[s.type]?.remove(s.id)
            return true
        }
        return false
    }
 
}