package com.flightfeather.uav.biz.datafetch
|
|
import com.flightfeather.uav.common.exception.BizException
|
import com.flightfeather.uav.common.net.ShenXinService
|
import com.flightfeather.uav.domain.entity.Mission
|
import com.flightfeather.uav.domain.repository.MissionRep
|
import com.flightfeather.uav.domain.repository.RealTimeDataRep
|
import com.flightfeather.uav.socket.eunm.UWDeviceType
|
import org.springframework.stereotype.Component
|
import java.time.LocalDate
|
import java.time.LocalDateTime
|
import java.time.ZoneId
|
import java.util.concurrent.ConcurrentHashMap
|
|
/**
|
* 申欣环保走航监测数据获取
|
* 每台设备只有一个线程,确保不会重复获取
|
* @date 2024/8/23
|
* @author feiyu02
|
*/
|
@Component
|
class ShenXinDataFetch(private val missionRep: MissionRep, private val realTimeDataRep: RealTimeDataRep) {
|
|
// 设备是否正在获取最新数据
|
private val deviceStatusMap = ConcurrentHashMap<String, Boolean>()
|
// 走航任务是否正在获取数据
|
private val missionStatusMap = ConcurrentHashMap<String, Boolean>()
|
/**
|
* 获取最新的数据
|
* @param deviceType 设备类型
|
* @param code 设备编号
|
*/
|
private fun fetchLatestData(deviceType: UWDeviceType, code: String) {
|
if (deviceStatusMap.containsKey(code)) {
|
// 设备正在获取实时数据,则直接返回,放弃本次请求
|
if (deviceStatusMap[code] == true) {
|
return
|
}
|
// 否则开始获取数据,修改状态为true
|
else {
|
deviceStatusMap[code] = true
|
}
|
}
|
// 当设备首次获取数据时,直接开始并修改状态为true
|
else {
|
deviceStatusMap[code] = true
|
}
|
|
try {
|
val latestData = realTimeDataRep.fetchData(deviceType, code, null, null, null, 1, 1)
|
val st = if (latestData.isEmpty()) {
|
LocalDate.now().atStartOfDay()
|
} else {
|
LocalDateTime.ofInstant(latestData[0].dataTime?.toInstant(), ZoneId.systemDefault()).plusSeconds(1)
|
}
|
val data = ShenXinService.fetchData(code, st, LocalDateTime.now())
|
if (data.isNotEmpty()) {
|
realTimeDataRep.saveData(deviceType, data)
|
}
|
} finally {
|
// 设备完成数据获取后,修改状态为false
|
deviceStatusMap[code] = false
|
}
|
}
|
|
/**
|
* 获取给定时间范围内的数据
|
*/
|
fun fetchLatestData(deviceType: UWDeviceType, code: String, sTime: LocalDateTime?, eTime: LocalDateTime?) {
|
if (sTime != null && eTime != null) {
|
val data = ShenXinService.fetchData(code, sTime, eTime)
|
realTimeDataRep.deleteData(deviceType, code, sTime, eTime)
|
realTimeDataRep.saveData(deviceType, data)
|
} else if (sTime == null && eTime == null) {
|
fetchLatestData(deviceType, code)
|
} else {
|
throw BizException("开始和结束时间需要都省略或者都填写")
|
}
|
}
|
|
/**
|
* 获取任务的所有数据
|
* @param mission 任务信息
|
* @param forceUpdate 强制将获取的数据替换原有的数据
|
*/
|
fun fetchMissionData(mission: Mission, forceUpdate: Boolean = false) {
|
if (missionStatusMap.containsKey(mission.missionCode)) {
|
// 走航任务正在获取数据,则直接返回,放弃本次请求
|
if (missionStatusMap[mission.missionCode] == true) {
|
return
|
}
|
// 否则开始获取数据,修改状态为true
|
else {
|
missionStatusMap[mission.missionCode] = true
|
}
|
}
|
// 当走航任务首次获取数据时,直接开始并修改状态为true
|
else {
|
missionStatusMap[mission.missionCode] = true
|
}
|
|
try {
|
val st = LocalDateTime.ofInstant(mission.startTime?.toInstant(), ZoneId.systemDefault())
|
val et = LocalDateTime.ofInstant(mission.endTime?.toInstant(), ZoneId.systemDefault())
|
val data = ShenXinService.fetchData(mission.deviceCode, st, et)
|
val type = UWDeviceType.fromValue(mission.deviceType)
|
val oldData = realTimeDataRep.fetchData(mission)
|
// 判断新数据数据量是否等于旧数据,若不相等,则简单认为数据有变动;或者设置了强制更新
|
if (data.size != oldData.size || forceUpdate) {
|
if (oldData.isNotEmpty()) {
|
realTimeDataRep.deleteData(mission)
|
}
|
realTimeDataRep.saveData(type, data)
|
}
|
} finally {
|
// 走航任务完成数据获取后,修改状态为false
|
missionStatusMap[mission.missionCode] = false
|
}
|
}
|
|
/**
|
* 获取任务的所有数据
|
* @param missionCode 任务编号
|
* @param forceUpdate 强制将获取的数据替换原有的数据
|
*/
|
fun fetchMissionData(missionCode: String, forceUpdate: Boolean = false) {
|
val mission = missionRep.findOne(missionCode) ?: throw BizException("任务不存在,无法获取第三方数据")
|
fetchMissionData(mission, forceUpdate)
|
}
|
|
|
}
|