#将数据写入设备信息表 分钟数据表 油烟浓度超标表 异常表中
|
|
from pymysql import * # 连接mysql数据库
|
import pandas as pd
|
from sqlalchemy import create_engine
|
import uuid
|
from datetime import datetime, timedelta
|
|
import sys
|
# sys.path.append('D:\\z\workplace\\VsCode\\pyvenv\\venv')
|
sys.path.append('../../')
|
import src.core_modules.remove_duplicates_methods as rdm
|
|
|
#------------------------------------------------------------------------------------------------------------超标油烟数据写入异常表中
|
#两时间是否相差10分钟 是则返回TRUE 否则返回FALSE
|
def is_time_difference_equals_10_mins(datestr1, datestr2):
|
date1 = datetime.strptime(datestr1, "%Y-%m-%d %H:%M")
|
date2 = datetime.strptime(datestr2, "%Y-%m-%d %H:%M")
|
time_diff = date2 - date1
|
|
return time_diff == timedelta(minutes = 10) or time_diff == timedelta(minutes = -10) #timedelta() 表示两个 date 对象或者 time 对象,或者 datetime 对象之间的时间间隔
|
|
|
#每隔十分钟一次为正常。 找出超过10分钟的间断点
|
def find_break_point(list): #list为超标数据的列表
|
i=0
|
j=1
|
break_point = [] #保存间断点
|
for item in list[1:]:
|
if(is_time_difference_equals_10_mins(list[i][2],item[2]) == False):
|
break_point.append(j)
|
i=i+1
|
j=j+1
|
print('间断点为:')
|
print(break_point)
|
|
#写入间断点
|
return break_point
|
|
|
|
#根据间断点将列表分割成几个子列表,由result返回
|
def point_write(list,b_point): #list为列表。b_point列表元素为间断点,间断点值从小到大
|
result = []
|
last_index = 0
|
for index in b_point:
|
result.append(list[last_index:index]) #灵活
|
last_index=index
|
result.append(list[last_index:])
|
return result
|
|
|
#将设备故障信息写入abnormal_data异常表中
|
def abnormal_write_to_SQL(list,con):
|
data = pd.DataFrame(list,columns=['dev_id','exception','exception_type','region','begin_time','end_time'])
|
print("\n\n")
|
print(data)
|
|
|
# test3 要写入的数据表,这样写的话要提前在数据库建好表
|
data.to_sql(name="abnormal_data", con=con, if_exists="append",index=False,index_label=False)
|
|
|
|
|
def exception(list,con): #list为超标数据的列表
|
break_point=find_break_point(list) #返回间断点
|
split_list=point_write(list,break_point) #根据间断点将原始列表分割成几个子列表 split_list为三层数组,形式为[[[1,2],[4,'g']],[[8,'2'],['4','g']],[[1,2],[4,'g']]]
|
|
print('\n')
|
abnormal=[] #重组好的异常表数据
|
|
for item in split_list: #从分割的数组中提取需要的时间信息,并添加新的信息数据
|
temp=[]
|
temp.append(item[0][0]) #设备编号
|
temp.append('数据异常') #设备编号
|
temp.append('0') #油烟浓度超标
|
temp.append('徐汇区')
|
temp.append(item[len(item)-1][2]) #前一条记录的归属时间 开始时间
|
temp.append(item[0][2]) #归属时间 结束时间
|
abnormal.append(temp)
|
|
print(abnormal)
|
|
print('超标异常时间段数据为:')
|
for j in abnormal:
|
print(j)
|
abnormal_write_to_SQL(abnormal,con) #写入异常表中
|
print("超标油烟数据异常表写入完成!")
|
|
#------------------------------------------------------------------------------------------------------------设备故障数据写入异常表中
|
#两时间是否相差30分钟 是则返回TRUE 否则返回FALSE
|
def is_time_difference_equals_30_mins(datestr1, datestr2):
|
date1 = datetime.strptime(datestr1, "%Y-%m-%d %H:%M")
|
date2 = datetime.strptime(datestr2, "%Y-%m-%d %H:%M")
|
time_diff = date2 - date1
|
return time_diff > timedelta(minutes=30)
|
|
#找出设备故障的信息,并将此信息写入异常表中
|
def is_minutes_exceed_30(list,con) : # list为某店铺指定页数的全部的记录 list元素中的时间为倒序排列,即从大到小
|
device_failure=[] #存储设备故障的数据
|
startTime = list[0][11]
|
print('开始时间:',startTime)
|
for item in list[1:] :
|
if is_time_difference_equals_30_mins(item[11],startTime) : #必须大于30分钟 不能等于30分钟
|
temp=[]
|
temp.append(item[2]) #设备编号
|
temp.append('设备故障') #设备编号
|
temp.append('1') #设备故障
|
temp.append('徐汇区')
|
temp.append(item[11]) #故障开始时间
|
startTimeSub= datetime.strptime(startTime,"%Y-%m-%d %H:%M") - timedelta(minutes = 10) #结果为datetime.datetime类型 ,需要再转为字符串类型
|
print('相减后结果:',str(startTimeSub))
|
print('相减后类型:',type(str(startTimeSub)))
|
temp.append(str(startTimeSub)[:16]) #故障结束时间
|
device_failure.append(temp)
|
startTime = item[11]
|
print('设备故障的数据为:')
|
for i in device_failure :
|
print(i)
|
not_Key_period_exceed_30_minutes(device_failure,con) #将供电异常信息写入异常表
|
print('供电异常/掉线信息写入异常表完成!')
|
#-----------------------------------------------------------------------------------------------------------供电异常数据写入异常表中
|
#开始和结束时间都处于非重点时段时,返回true
|
def is_time_not_between_key_period(begin_time,end_time) : #形参为日期字符串,形如 '2023-06-21 14:30'
|
global Key_period_noon_begin,Key_period_noon_end,Key_period_night_begin,Key_period_night_end
|
|
begin1 = datetime.strptime(begin_time[11:],"%H:%M")
|
end1 = datetime.strptime(end_time[11:],"%H:%M")
|
|
#当开始和结束时间都处于非重点时段时,将该条故障信息同时记录为: 疑似供电异常
|
if ((( begin1 > Key_period_noon_begin and begin1 < Key_period_noon_end ) or ( begin1 > Key_period_night_begin and begin1 < Key_period_night_end )) or (( end1 > Key_period_noon_begin and end1 < Key_period_noon_end ) or ( end1 > Key_period_night_begin and end1 < Key_period_night_end ))) ==False :
|
print('开始或结束时间时间在非重点时段')
|
return True
|
print('处于重点时段')
|
return False
|
|
#开始和结束时间都处于重点时段时,返回true
|
def is_time_between_key_period(begin_time,end_time) : #形参为日期字符串,形如 '2023-06-21 14:30'
|
global Key_period_noon_begin,Key_period_noon_end,Key_period_night_begin,Key_period_night_end
|
|
|
begin1 = datetime.strptime(begin_time[11:],"%H:%M")
|
end1 = datetime.strptime(end_time[11:],"%H:%M")
|
|
#当开始和结束时间都处于重点时段时,将该条故障信息同时记录为: 掉线
|
if ((begin1 > Key_period_noon_begin and begin1 < Key_period_noon_end) and ( end1 > Key_period_noon_begin and end1 < Key_period_noon_end )) or ( (begin1 > Key_period_night_begin and begin1 < Key_period_night_end) and ( end1 > Key_period_night_begin and end1 < Key_period_night_end )) :
|
print('开始或结束时间处于重点时段')
|
return True
|
print('处于非重点时段')
|
return False
|
|
|
|
def not_Key_period_exceed_30_minutes(list,con) : #list为设备故障的时间段数据
|
power_supply_abnormal = [] #保存供电异常或掉线的信息
|
for item in list :
|
if is_time_not_between_key_period(item[4],item[5]) : #else:
|
temp = []
|
temp.append(item[0])
|
temp.append('设备故障')
|
temp.append('1') #疑似供电异常
|
temp.append('徐汇区')
|
temp.append(item[4])
|
temp.append(item[5])
|
power_supply_abnormal.append(temp)
|
elif is_time_between_key_period(item[4],item[5]) :
|
temp = []
|
temp.append(item[0])
|
temp.append('设备故障')
|
temp.append('2') #掉线
|
temp.append('徐汇区')
|
temp.append(item[4])
|
temp.append(item[5])
|
power_supply_abnormal.append(temp)
|
print('供电异常的数据为:')
|
for i in power_supply_abnormal :
|
print(i)
|
|
#将供电异常的信息写入数据库异常表中
|
abnormal_write_to_SQL(power_supply_abnormal,con) #将设备故障信息写入异常表
|
print('供电异常的信息写入异常表完成!')
|
|
|
|
#------------------------------------------------------------------------------------------------------------写入超标表中
|
|
#返回重组后的列表
|
def refind_ex(list): #list为网页的一条记录
|
temp=[]
|
temp.append(list[2]) #设备编号
|
temp.append(list[12]) #上报时间
|
temp.append(list[11]) #归属时间
|
temp.append(list[6]) #风机电流 6
|
temp.append(list[7]) #净化器电流7
|
temp.append(list[4]) #进油烟浓度值
|
temp.append(list[5]) #排油烟浓度值
|
|
print(temp)
|
return temp
|
|
|
#将列表写入exceeding_st_data表中
|
def ex_write_to_SQL(list,con):
|
data = pd.DataFrame(list,columns=['MV_Stat_Code','MV_Create_Time','MV_Data_Time','MV_Fan_Electricity','MV_Purifier_Electricity','MV_Fume_Concentration','MV_Fume_Concentration2'])
|
print("\n\n")
|
print(data)
|
|
# test3 要写入的数据表,这样写的话要提前在数据库建好表
|
data.to_sql(name="exceeding_st_data", con=con, if_exists="append",index=False,index_label=False)
|
print("超标表写入完成!")
|
|
|
# list为某店铺指定页数的全部的记录 将超标数据写入超标表
|
def isExceeding(list,con): #list为某店铺指定页数的全部的记录 list元素为列表形式
|
exceedingData=[] #保存超标的数据
|
for item in list: #查找超标的数据,并记录下
|
if float(item[5]) > 1: # 排烟浓度大于1则超标
|
print("该条数据超标")
|
#保存该条记录,提取需要的值,并添加其他字段
|
exceedingData.append(refind_ex(item))
|
|
|
for i in exceedingData: #遍历列表
|
print(i)
|
|
if(len(exceedingData) != 0) : #有超标数据时才执行
|
#将超标数据时间分类再写abnormal_data异常表中
|
exception(exceedingData,con)
|
|
#将超标数据直接写入数据库超标表中
|
ex_write_to_SQL(exceedingData,con)
|
else:
|
print('该店铺无超标数据')
|
|
|
#------------------------------------------------------------------------------------------------------------数据写入设备信息表
|
def generate_short_uuid():
|
arrayOf=[
|
"a",
|
"b",
|
"c",
|
"d",
|
"e",
|
"f",
|
"g",
|
"h",
|
"i",
|
"j",
|
"k",
|
"l",
|
"m",
|
"n",
|
"o",
|
"p",
|
"q",
|
"r",
|
"s",
|
"t",
|
"u",
|
"v",
|
"w",
|
"x",
|
"y",
|
"z",
|
"0",
|
"1",
|
"2",
|
"3",
|
"4",
|
"5",
|
"6",
|
"7",
|
"8",
|
"9",
|
"A",
|
"B",
|
"C",
|
"D",
|
"E",
|
"F",
|
"G",
|
"H",
|
"I",
|
"J",
|
"K",
|
"L",
|
"M",
|
"N",
|
"O",
|
"P",
|
"Q",
|
"R",
|
"S",
|
"T",
|
"U",
|
"V",
|
"W",
|
"X",
|
"Y",
|
"Z"
|
]
|
list=[]
|
ui=str(uuid.uuid4()).replace('-', '')
|
for i in range(0,16):
|
a1=ui[i*2:i*2+2]
|
x=int(a1,16)
|
list.append(arrayOf[x % 0x3E])
|
return ''.join(list)
|
|
|
#返回重组后的列表
|
def refind_ea(list): #一条记录,也就是一个列表
|
temp=[]
|
temp.append(generate_short_uuid())
|
temp.append(list[2])
|
temp.append(list[1])
|
temp.append(list[0])
|
temp.append(1)
|
print(temp)
|
return temp
|
|
#将列表写入设备信息设备信息ea_t_dev表中
|
def ea_write_to_SQL(list,con):
|
data = pd.DataFrame(list,columns=['DI_GUID','DI_Code','DI_Name','DI_Supplier','DI_Online'])
|
print("\n\n")
|
print('写入数据表 ,DateFrame为:',data)
|
|
# test3 要写入的数据表,这样写的话要提前在数据库建好表
|
data.to_sql(name="ea_t_device_info", con=con, if_exists="append",index=False,index_label=False)
|
print("设备信息表写入完成!")
|
|
|
def dev_info_data_if_exisitd(list,con): #list为爬取某家店铺指定页数转换后的数据
|
global con_read
|
df = pd.read_sql('SELECT DI_Code,DI_Name,DI_Supplier FROM ea_t_device_info',con=con_read) #从设备信息表中读取设备编号,店铺名,供应商字段的数据。返回值是DateFrame类型
|
res = df.values.tolist() #DateFrame按照行转成list类型,res存放的是设备信息表中的数据
|
print('******** 设备信息******')
|
for i in res:
|
print(i)
|
print('设备信息表记录条数为:',len(res))
|
|
list1 = rdm.remove_duplicates_dev_info(list) #设备编号,店铺名,供应商相等时,则为重复,去除。list1为去重后的
|
if len(res) > 0 : #设备表中有数据
|
#比较
|
temp=list1[:] #将list1数据给temp,遍历temp,若相等,从list中删除数据,避免一个列表同时遍历且删除
|
print('去除重复为:')
|
print(list1)
|
for item in temp:
|
if item[1:4] in ( x[:] for x in res ) : #待存入数据库的值与设备表中数据相等时,将待存入的值从list中移除
|
list1=rdm.remove_given_data_dev_info(list1,item[1:4]) #该item从list1中移除
|
|
print('设备信息表中有数据时,去重后的list为:',list1)
|
if( len(list1) != 0 ) : #删除后不为空时,写入
|
ea_write_to_SQL(list1,con) #将列表写入ea_t_dev表中
|
else : #设备表中无数据
|
# a=rdm.remove_duplicates_dev_info(list) #设备编号,店铺名,供应商相等时,则为重复,去除
|
print('设备表无数据,处理后待写入的设备信息为:',list1)
|
#将去重后数据写入设备信息表
|
ea_write_to_SQL(list1,con) #将列表写入设备表中 。 第一个参数:设备编号,店铺名,供应商相等时,则为重复,去除
|
|
|
|
#将原始数据转化成新的列表,再写入设备信息设备信息表中 /存入
|
def ea_t_dev(list,con): #某家店铺的制定页的数据记录 ,list列表元素依然为列表,比如[[1,2,3,'a'],[52,3,'a'],[6,2,3,'a']] ,con为数据库的建立
|
staging=[] #表示转换后的列表
|
for item in list:
|
#提取需要的值,并添加其他字段
|
staging.append(refind_ea(item)) #转化
|
print('设备数据转化后:')
|
for i in staging:
|
print(i)
|
|
#查询设备表已存的数据,若已存在设备信息,则不写入
|
dev_info_data_if_exisitd(staging,con)
|
|
|
#----------------------------------写入分钟数据表
|
|
#返回重组后的列表
|
def refind_fd(list): #一条记录,也就是一个列表
|
temp=[]
|
temp.append(list[2]) #设备编号
|
temp.append(list[12]) #上报时间
|
temp.append(list[11]) #归属时间
|
temp.append(list[6]) #风机电流 6
|
temp.append(list[7]) #净化器电流 7
|
temp.append(list[4]) #进油烟浓度值
|
temp.append(list[5]) #排油烟浓度值
|
temp.append(list[14]) #重复的次数
|
|
print(temp)
|
return temp
|
|
|
#将列表写入分钟数据表中
|
def fd_write_to_SQL(list,con):
|
data = pd.DataFrame(list,columns=['MV_Stat_Code','MV_Create_Time','MV_Data_Time','MV_Fan_Electricity','MV_Purifier_Electricity','MV_Fume_Concentration','MV_Fume_Concentration2','MV_Isduplication'])
|
print("写入分数数据表,DateFrame为:")
|
print(data)
|
|
# test3 要写入的数据表,这样写的话要提前在数据库建好表
|
data.to_sql(name="fd_t_minutevalue", con=con, if_exists="append",index=False,index_label=False)
|
|
print("分钟数据表写入完成!")
|
|
#转化 再写入fd_t_minbute表中
|
def fd_t_minbute(list,con): #一页的数据记录 ,con为数据库的建立
|
staging=[] #保存转换后的列表
|
for item in list:
|
#提取需要的值,并添加其他字段
|
staging.append(refind_fd(item))
|
print('分钟数据转化后:')
|
for i in staging:
|
print(i)
|
fd_write_to_SQL(staging,con) #将列表写入ea_t_dec表中
|
|
|
|
#-------------------------------------------------------------------------------------------------------------
|
|
|
def write_all(has_remove_duplicates,con): #爬取文件中所有店铺(包括特殊的url店铺) 数据库连接对象 ,要爬取的页数,开始时间,结束时间
|
is_minutes_exceed_30(has_remove_duplicates,con) # 将指定页数的设备故障数据写入数据库异常表中
|
isExceeding(has_remove_duplicates,con) # 将指定页数数据写入数据库超标表中 写入异常表中
|
ea_t_dev(has_remove_duplicates,con) # 将指定页数数据写入数据库设备信息表中
|
fd_t_minbute(has_remove_duplicates,con) #将指定页数数据写入数据库分钟数据表中
|
|
Key_period_noon_begin = datetime.strptime('10:00',"%H:%M") #中午重点时段
|
Key_period_noon_end = datetime.strptime('14:00',"%H:%M")
|
|
Key_period_night_begin = datetime.strptime('17:00',"%H:%M") #晚上重点时段
|
Key_period_night_end = datetime.strptime('21:00',"%H:%M")
|
|
|
engine = create_engine("mysql+pymysql://fumeRemote:feiyu2023@114.215.109.124:3306/fume?charset=utf8")
|
# 专门读取设备信息表
|
con_read = engine.connect()
|
|
def write(data):
|
engine = create_engine("mysql+pymysql://fumeRemote:feiyu2023@114.215.109.124:3306/fume?charset=utf8")
|
con = engine.connect()
|
write_all(data,con)
|
con.close()
|
|
|