package cn.flightfeather.supervision.common.executor
|
|
import cn.flightfeather.supervision.common.exception.ResponseErrorException
|
import org.springframework.stereotype.Component
|
import java.time.LocalDateTime
|
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.Executors
|
import kotlin.jvm.Throws
|
|
/**
|
* 后台耗时任务精确管理
|
*/
|
@Component
|
class BackgroundTaskCtrl {
|
|
// 任务列表
|
private val taskCollection = ConcurrentHashMap<BgTaskType, ConcurrentHashMap<String, BgTask>>()
|
// 线程池
|
private val executorService = Executors.newCachedThreadPool()
|
|
/**
|
* 新增任务
|
*/
|
@Throws(ResponseErrorException::class)
|
fun newTask(type: BgTaskType, id: String, name: String, task: () -> Boolean): BgTask {
|
if (!taskCollection.containsKey(type)) {
|
taskCollection[type] = ConcurrentHashMap<String, BgTask>()
|
}
|
val taskSet = taskCollection[type]!!
|
if (taskSet.containsKey(id)) throw ResponseErrorException("无法创建任务, 任务[${name}]的id重复")
|
val t = BgTask(type, id, name, task)
|
taskSet[id] = t
|
return t
|
}
|
|
/**
|
* 开始任务
|
*/
|
@Throws(ResponseErrorException::class)
|
fun startTask(type: BgTaskType, id: String): BgTask {
|
val taskSet = taskCollection[type] ?: throw throw ResponseErrorException("无法开启任务,该任务类型[${type.des}]不存在")
|
val t = taskSet[id] ?: throw ResponseErrorException("无法开启任务,该任务[${id}]不存在")
|
return startTask(t)
|
}
|
|
@Throws(ResponseErrorException::class)
|
fun startTask(task: BgTask): BgTask {
|
if (task.taskStatus.status != TaskStatus.WAITING) {
|
if (task.taskStatus.status == TaskStatus.RUNNING) {
|
throw ResponseErrorException("无法开启任务,任务[${task.name}]正在执行")
|
} else {
|
throw ResponseErrorException("无法开启任务,任务[${task.name}]已结束")
|
}
|
} else {
|
task.ready()
|
task.future = executorService.submit { task.execute() }
|
return task
|
}
|
}
|
|
/**
|
* 新增并开始任务
|
*/
|
@Throws(ResponseErrorException::class)
|
fun startNewTask(type: BgTaskType, id: String, name: String, task: () -> Boolean): BgTask {
|
val t = newTask(type, id, name, task)
|
return startTask(t)
|
}
|
|
/**
|
* 获取任务状态
|
*/
|
fun getTaskStatus(type: BgTaskType?, id: String? = null): List<BgTaskStatus?> {
|
if (type != null) {
|
val set = taskCollection[type] ?: return emptyList()
|
return if (id == null) {
|
set.toList().map {
|
it.second.taskStatus
|
}.sortedBy { it.createTime }
|
} else {
|
val s = set[id]?.taskStatus
|
listOf(s)
|
}
|
} else {
|
val res = mutableListOf<BgTaskStatus?>()
|
taskCollection.forEach { (t, u) ->
|
val statusList = u.toList().map {
|
it.second.taskStatus
|
}
|
res.addAll(statusList)
|
}
|
return res.sortedBy { it?.createTime }
|
}
|
}
|
|
/**
|
* 强制关闭任务
|
*/
|
@Throws(ResponseErrorException::class)
|
fun shutDownTask(type: BgTaskType, id: String?): List<BgTaskStatus?> {
|
val taskMap = taskCollection[type] ?: throw ResponseErrorException("无法关闭任务,任务类型[${type.des}]未创建")
|
return if (id != null) {
|
val task = taskMap[id] ?: throw ResponseErrorException("无法关闭任务,任务[${id}]不存在")
|
task.shutdown()
|
listOf(task.taskStatus)
|
} else {
|
val res = mutableListOf<BgTaskStatus?>()
|
taskMap.forEach { (t, u) ->
|
u.shutdown()
|
res.add(u.taskStatus)
|
}
|
res.sortedBy { it?.createTime }
|
}
|
}
|
|
@Throws(ResponseErrorException::class)
|
fun removeTask(type: BgTaskType, id: String): Boolean {
|
val statusList = shutDownTask(type, id)
|
if (statusList.isNotEmpty()) {
|
val s = statusList.first() ?: throw ResponseErrorException("无法移除任务,任务不存在")
|
taskCollection[s.type]?.remove(s.id)
|
return true
|
}
|
return false
|
}
|
|
}
|