feiyu02
2025-07-29 2e024c986c14943a41f7bfe913cfef0cede64198
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package com.flightfeather.uav.biz.datafetch
 
import com.flightfeather.uav.common.exception.BizException
import com.flightfeather.uav.common.net.ShenXinService
import com.flightfeather.uav.domain.entity.BaseRealTimeData
import com.flightfeather.uav.domain.entity.Mission
import com.flightfeather.uav.domain.repository.MissionRep
import com.flightfeather.uav.domain.repository.RealTimeDataRep
import com.flightfeather.uav.socket.eunm.UWDeviceType
import org.springframework.stereotype.Component
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.ZoneId
import java.util.concurrent.ConcurrentHashMap
 
/**
 * 申欣环保走航监测数据获取
 * 每台设备只有一个线程,确保不会重复获取
 * @date 2024/8/23
 * @author feiyu02
 */
@Component
class ShenXinDataFetch(private val missionRep: MissionRep, private val realTimeDataRep: RealTimeDataRep) {
 
    // 设备是否正在获取最新数据
    private val deviceStatusMap = ConcurrentHashMap<String, Boolean>()
    // 走航任务是否正在获取数据
    private val missionStatusMap = ConcurrentHashMap<String, Boolean>()
 
    /**
     * 获取最新的数据
     * @param deviceType 设备类型
     * @param code 设备编号
     */
    private fun fetchLatestData(deviceType: UWDeviceType, code: String): List<BaseRealTimeData> {
        if (deviceStatusMap.containsKey(code)) {
            // 设备正在获取实时数据,则直接返回,放弃本次请求
            if (deviceStatusMap[code] == true) {
                return emptyList()
            }
            // 否则开始获取数据,修改状态为true
            else {
                deviceStatusMap[code] = true
            }
        }
        // 当设备首次获取数据时,直接开始并修改状态为true
        else {
            deviceStatusMap[code] = true
        }
 
        try {
            val latestData = realTimeDataRep.fetchData(deviceType, code, null, null, null, 1, 1)
            val st = if (latestData.isEmpty()) {
                LocalDate.now().atStartOfDay()
            } else {
                LocalDateTime.ofInstant(latestData[0].dataTime?.toInstant(), ZoneId.systemDefault()).plusSeconds(1)
            }
            val data = ShenXinService.fetchData(code, st, LocalDateTime.now())
            if (data.isNotEmpty()) {
                realTimeDataRep.saveData(deviceType, data)
            }
            return data
        } finally {
            // 设备完成数据获取后,修改状态为false
            deviceStatusMap[code] = false
        }
    }
 
    /**
     * 获取给定时间范围内的数据
     */
    fun fetchLatestData(deviceType: UWDeviceType, code: String, sTime: LocalDateTime?, eTime: LocalDateTime?)
            : List<BaseRealTimeData> {
        if (sTime != null && eTime != null) {
            val data = ShenXinService.fetchData(code, sTime, eTime)
            realTimeDataRep.deleteData(deviceType, code, sTime, eTime)
            realTimeDataRep.saveData(deviceType, data)
            return data
        } else if (sTime == null && eTime == null) {
            return fetchLatestData(deviceType, code)
        } else {
            throw BizException("开始和结束时间需要都省略或者都填写")
        }
    }
 
    /**
     * 获取任务的所有数据
     * @param mission 任务信息
     * @param forceUpdate 强制将获取的数据替换原有的数据
     */
    fun fetchMissionData(mission: Mission, forceUpdate: Boolean = false) {
        if (missionStatusMap.containsKey(mission.missionCode)) {
            // 走航任务正在获取数据,则直接返回,放弃本次请求
            if (missionStatusMap[mission.missionCode] == true) {
                return
            }
            // 否则开始获取数据,修改状态为true
            else {
                missionStatusMap[mission.missionCode] = true
            }
        }
        // 当走航任务首次获取数据时,直接开始并修改状态为true
        else {
            missionStatusMap[mission.missionCode] = true
        }
 
        try {
            val st = LocalDateTime.ofInstant(mission.startTime?.toInstant(), ZoneId.systemDefault())
            val et = LocalDateTime.ofInstant(mission.endTime?.toInstant(), ZoneId.systemDefault())
            val data = ShenXinService.fetchData(mission.deviceCode, st, et)
            val type = UWDeviceType.fromValue(mission.deviceType)
            val oldData = realTimeDataRep.fetchData(mission)
            // 判断新数据数据量是否等于旧数据,若不相等,则简单认为数据有变动;或者设置了强制更新
            if (data.size != oldData.size || forceUpdate) {
                if (oldData.isNotEmpty()) {
                    realTimeDataRep.deleteData(mission)
                }
                realTimeDataRep.saveData(type, data)
            }
        } finally {
            // 走航任务完成数据获取后,修改状态为false
            missionStatusMap[mission.missionCode] = false
        }
    }
 
    /**
     * 获取任务的所有数据
     * @param missionCode 任务编号
     * @param forceUpdate 强制将获取的数据替换原有的数据
     */
    fun fetchMissionData(missionCode: String, forceUpdate: Boolean = false) {
        val mission = missionRep.findOne(missionCode) ?: throw BizException("任务不存在,无法获取第三方数据")
        fetchMissionData(mission, forceUpdate)
    }
 
 
}