#sum 多页 入库成功 爬取文件中所有的店铺 网页完整表端 去除重复数据 遇到空页面会跳到下一家店铺 。遇到某家店铺无数据,跳过去下一家 #爬去某家店铺指定的页数(一页记录数默认大小为100条),比如爬取12页,则爬取12页后将结果一次性写入数据库 #爬去整个页面表结构,再分别写入4张表中(爬取的数据存入两张表中,还有超标表 异常表) #网页上字段共14个,存入数据库是15个(序号+14) import requests from bs4 import BeautifulSoup as bs import re #正则表达式 from pymysql import * # 连接mysql数据库 import pandas as pd from sqlalchemy import create_engine import urllib.parse #url双重编码 import time import uuid from datetime import datetime, timedelta import sys sys.path.append('../../') import src.core_modules.remove_duplicates_methods as rdm now_date = time.strftime("%Y-%m-%d", time.localtime()) #获取当前年月日 #url编码年月日开始默认时间 now_date1 = time.strftime("%Y-%m", time.localtime()) month_begin=now_date1+'-01' #设置当前月份的开始 list_temp=[] #临时列表 全局变量 def remove_Duplicates_list(list): #列表自身去重 global already_spider_datanum list_store=[] for item in list: if item not in list_store: list_store.append(item) else: print("发现重复") already_spider_datanum=already_spider_datanum-1 #print(list_store) return list_store def merge(list): #合并list倒数六个元素 date_1=str(list.pop(-1)) #删除尾元素后还能继续使用改元素, date_2=str(list.pop(-1)) date1=date_2+' '+date_1 #合并为年月日时分秒 date_3=str(list.pop(-1)) date_4=str(list.pop(-1)) date2=date_4+' '+date_3 date_5=str(list.pop(-1)) date_6=str(list.pop(-1)) date3=date_6+' '+date_5 list.append(date3) #将合并的数据写会list列表结尾. list.append(date2) list.append(date1) return list def list_url(url,page_num): #url中的i是页 ,apge_num表示爬取的页数 。url后面加上页的参数 urls = [url+'&page'+'={}'.format(str(i)) for i in range(1,page_num+1)] return urls # 返回该url对应页的所有链接形式,返回值为列表 def get_OnePage(url,count): #抓取一页的数据,放入list_data中.urls为要访问的网页地址 global list_temp #使用全局变量 list_temp.clear() #清空临时表 r = session.get(url, verify=False).text soup = bs(r,'html.parser') # 找到所有的tr标签 rows = soup.find_all('tr') # 提取表格中的数据 result = [] #创建列表来保存结果 for row in rows: # 数据条数加1 count=count+1 # 保存表格的行数据 data = [] # 得到该行的所有列数据 cols = row.find_all('td') # 循环每一列 for col in cols: if col.find('div'): # 如果td中包含div,则单独提取其内容 div_content = col.find('div').text.strip() # 返回元素的文本内容 搜索tag的直接子节点 td_content = ''.join(col.find_all(text=True, recursive=False)).strip() data.append(td_content) data.append(div_content) else: # 如果td中不包含div,则直接提取td的内容 td_content = col.text.strip() data.append(td_content) # 删除'操作','详情' del (data[-2:]) # 删除地址 del (data[2]) result.append(data) print('一页的结果为:',result) # 删除表头的标题行 del (result[0]) count=count-1 #删除了表头,总数据的行数减一 list_temp=result[:] print('-'*10) print('获取到的数据并且处理后为:') print(list_temp) print('-'*10) # 返回获取到数据的条数 return count def get_MorePages(url,page_num): #爬取指定店铺名的多页数据,apge_num表示爬取的页数 global sleeptime global already_spider_datanum urls=list_url(url,page_num) #得到需要遍历的页的url count_all=0 #保存数据的总行数 list_all=[] #保存爬取的所有的数据 page=1 for i in urls: count=0 count_all=count_all+get_OnePage(i,count) if len(list_temp)==0: #如果该页为空,则表示该页后面都无数据 退出循环 print('后面页数为空,爬去下一个店铺') break #退出循环 list_all.extend(list_temp) #将一页数据列表追加到list_all中 print("爬取了第",page,"页") page=page+1 print("\n") time.sleep(sleeptime) #间隔2秒请求一次 for j in list_all: print(j) #打印列表中每一行 print("总行数为:",count_all) already_spider_datanum += count_all #已爬取数据的总和 return list_all def url_more(): #返回文件中铺名编码形成url,返回值是url列表 默认查看网页的最大显示条数100 global shopnum global webshops shopnames = webshops[:] #保存中文店铺名称 print('url_more:',shopnames) #编码 shopnum=len(shopnames) #文件中店铺总数 shopname_encoding=[] #保存编码后的店铺名称 i=0 for name in shopnames: shopname_encoding.append(urllib.parse.quote(urllib.parse.quote(shopnames[i]))) #店铺名称进行双重url编码 i=i+1 #拼接网址形成可用的url urls=[] #保存拼接后的url for shop in shopname_encoding: url='http://xhhb.senzly.cn/sys/yyRealTimeValue_list.jsp?key1=&shop='+shop+'&pagesize=100' urls.append(url) # for i in urls: # print(i) return urls #返回文件中店铺名称对应的url #根据开始和结束日期来拼接url def url_add_time(url,date_begin=month_begin,date_end=now_date): #url,年-月-日 2023-05-03 url_date=url+'&key5='+date_begin+'&key6='+date_end print(url_date) return url_date #------------------------------------------------------------------------------------------------------------超标油烟数据写入异常表中 #两时间是否相差10分钟 是则返回TRUE 否则返回FALSE def is_time_difference_equals_10_mins(datestr1, datestr2): date1 = datetime.strptime(datestr1, "%Y-%m-%d %H:%M") date2 = datetime.strptime(datestr2, "%Y-%m-%d %H:%M") time_diff = date2 - date1 return time_diff == timedelta(minutes = 10) or time_diff == timedelta(minutes = -10) #timedelta() 表示两个 date 对象或者 time 对象,或者 datetime 对象之间的时间间隔 #每隔十分钟一次为正常。 找出超过10分钟的间断点 def find_break_point(list): #list为超标数据的列表 i=0 j=1 break_point = [] #保存间断点 for item in list[1:]: if(is_time_difference_equals_10_mins(list[i][2],item[2]) == False): break_point.append(j) i=i+1 j=j+1 print('间断点为:') print(break_point) #写入间断点 return break_point #根据间断点将列表分割成几个子列表,由result返回 def point_write(list,b_point): #list为列表。b_point列表元素为间断点,间断点值从小到大 result = [] last_index = 0 for index in b_point: result.append(list[last_index:index]) #灵活 last_index=index result.append(list[last_index:]) return result #将设备故障信息写入abnormal_data异常表中 def abnormal_write_to_SQL(list,con): data = pd.DataFrame(list,columns=['dev_id','exception','exception_type','region','begin_time','end_time']) print("\n\n") print(data) # test3 要写入的数据表,这样写的话要提前在数据库建好表 data.to_sql(name="abnormal_data", con=con, if_exists="append",index=False,index_label=False) def exception(list,con): #list为超标数据的列表 break_point=find_break_point(list) #返回间断点 split_list=point_write(list,break_point) #根据间断点将原始列表分割成几个子列表 split_list为三层数组,形式为[[[1,2],[4,'g']],[[8,'2'],['4','g']],[[1,2],[4,'g']]] # print('超标时间段划分成的子列表为::') # for i in split_list: # print(i) print('\n') abnormal=[] #重组好的异常表数据 for item in split_list: #从分割的数组中提取需要的时间信息,并添加新的信息数据 temp=[] temp.append(item[0][0]) #设备编号 temp.append('数据异常') #设备编号 temp.append('0') #油烟浓度超标 temp.append('徐汇区') temp.append(item[len(item)-1][2]) #前一条记录的归属时间 开始时间 temp.append(item[0][2]) #归属时间 结束时间 abnormal.append(temp) print(abnormal) print('超标异常时间段数据为:') for j in abnormal: print(j) abnormal_write_to_SQL(abnormal,con) #写入异常表中 print("超标油烟数据异常表写入完成!") #------------------------------------------------------------------------------------------------------------设备故障数据写入异常表中 #两时间是否相差30分钟 是则返回TRUE 否则返回FALSE def is_time_difference_equals_30_mins(datestr1, datestr2): date1 = datetime.strptime(datestr1, "%Y-%m-%d %H:%M") date2 = datetime.strptime(datestr2, "%Y-%m-%d %H:%M") time_diff = date2 - date1 return time_diff > timedelta(minutes=30) #找出设备故障的信息,并将此信息写入异常表中 def is_minutes_exceed_30(list,con) : # list为某店铺指定页数的全部的记录 list元素中的时间为倒序排列,即从大到小 device_failure=[] #存储设备故障的数据 startTime = list[0][11] print('开始时间:',startTime) for item in list[1:] : if is_time_difference_equals_30_mins(item[11],startTime) : #必须大于30分钟 不能等于30分钟 temp=[] temp.append(item[2]) #设备编号 temp.append('设备故障') #设备编号 temp.append('1') #设备故障 temp.append('徐汇区') temp.append(item[11]) #故障开始时间 startTimeSub= datetime.strptime(startTime,"%Y-%m-%d %H:%M") - timedelta(minutes = 10) #结果为datetime.datetime类型 ,需要再转为字符串类型 print('相减后结果:',str(startTimeSub)) print('相减后类型:',type(str(startTimeSub))) temp.append(str(startTimeSub)[:16]) #故障结束时间 device_failure.append(temp) startTime = item[11] print('设备故障的数据为:') for i in device_failure : print(i) not_Key_period_exceed_30_minutes(device_failure,con) #将供电异常信息写入异常表 #abnormal_write_to_SQL(device_failure,con) #将设备故障信息写入异常表 print('供电异常/掉线信息写入异常表完成!') #-----------------------------------------------------------------------------------------------------------供电异常数据写入异常表中 #开始和结束时间都处于非重点时段时,返回true def is_time_not_between_key_period(begin_time,end_time) : #形参为日期字符串,形如 '2023-06-21 14:30' global Key_period_noon_begin,Key_period_noon_end,Key_period_night_begin,Key_period_night_end # #中午重点时段 # Key_period_noon_begin = datetime.strptime('10:00',"%H:%M") # Key_period_noon_end = datetime.strptime('14:00',"%H:%M") # #晚上重点时段 # Key_period_night_begin = datetime.strptime('17:00',"%H:%M") # Key_period_night_end = datetime.strptime('21:00',"%H:%M") begin1 = datetime.strptime(begin_time[11:],"%H:%M") end1 = datetime.strptime(end_time[11:],"%H:%M") #当开始和结束时间都处于非重点时段时,将该条故障信息同时记录为: 疑似供电异常 if ((( begin1 > Key_period_noon_begin and begin1 < Key_period_noon_end ) or ( begin1 > Key_period_night_begin and begin1 < Key_period_night_end )) or (( end1 > Key_period_noon_begin and end1 < Key_period_noon_end ) or ( end1 > Key_period_night_begin and end1 < Key_period_night_end ))) ==False : print('开始或结束时间时间在非重点时段') return True print('处于重点时段') return False #开始和结束时间都处于重点时段时,返回true def is_time_between_key_period(begin_time,end_time) : #形参为日期字符串,形如 '2023-06-21 14:30' global Key_period_noon_begin,Key_period_noon_end,Key_period_night_begin,Key_period_night_end # #中午重点时段 # Key_period_noon_begin = datetime.strptime('10:00',"%H:%M") # Key_period_noon_end = datetime.strptime('14:00',"%H:%M") # #晚上重点时段 # Key_period_night_begin = datetime.strptime('17:00',"%H:%M") # Key_period_night_end = datetime.strptime('21:00',"%H:%M") begin1 = datetime.strptime(begin_time[11:],"%H:%M") end1 = datetime.strptime(end_time[11:],"%H:%M") #当开始和结束时间都处于重点时段时,将该条故障信息同时记录为: 掉线 if ((begin1 > Key_period_noon_begin and begin1 < Key_period_noon_end) and ( end1 > Key_period_noon_begin and end1 < Key_period_noon_end )) or ( (begin1 > Key_period_night_begin and begin1 < Key_period_night_end) and ( end1 > Key_period_night_begin and end1 < Key_period_night_end )) : print('开始或结束时间处于重点时段') return True print('处于非重点时段') return False def not_Key_period_exceed_30_minutes(list,con) : #list为设备故障的时间段数据 power_supply_abnormal = [] #保存供电异常或掉线的信息 for item in list : if is_time_not_between_key_period(item[4],item[5]) : #else: temp = [] temp.append(item[0]) temp.append('设备故障') temp.append('1') #疑似供电异常 temp.append('徐汇区') temp.append(item[4]) temp.append(item[5]) power_supply_abnormal.append(temp) elif is_time_between_key_period(item[4],item[5]) : temp = [] temp.append(item[0]) temp.append('设备故障') temp.append('2') #掉线 temp.append('徐汇区') temp.append(item[4]) temp.append(item[5]) power_supply_abnormal.append(temp) print('供电异常的数据为:') for i in power_supply_abnormal : print(i) #将供电异常的信息写入数据库异常表中 abnormal_write_to_SQL(power_supply_abnormal,con) #将设备故障信息写入异常表 print('供电异常的信息写入异常表完成!') #------------------------------------------------------------------------------------------------------------写入超标表中 #返回重组后的列表 def refind_ex(list): #list为网页的一条记录 temp=[] temp.append(list[2]) #设备编号 temp.append(list[12]) #上报时间 temp.append(list[11]) #归属时间 temp.append(list[6]) #风机电流 6 temp.append(list[7]) #净化器电流7 temp.append(list[4]) #进油烟浓度值 temp.append(list[5]) #排油烟浓度值 print(temp) return temp #将列表写入exceeding_st_data表中 def ex_write_to_SQL(list,con): data = pd.DataFrame(list,columns=['MV_Stat_Code','MV_Create_Time','MV_Data_Time','MV_Fan_Electricity','MV_Purifier_Electricity','MV_Fume_Concentration','MV_Fume_Concentration2']) print("\n\n") print(data) #engine = create_engine("mysql+pymysql://root:1234@localhost:3306/fume?charset=utf8") #con = engine.connect() # test3 要写入的数据表,这样写的话要提前在数据库建好表 data.to_sql(name="exceeding_st_data", con=con, if_exists="append",index=False,index_label=False) #con.close() print("超标表写入完成!") # list为某店铺指定页数的全部的记录 将超标数据写入超标表 def isExceeding(list,con): #list为某店铺指定页数的全部的记录 list元素为列表形式 exceedingData=[] #保存超标的数据 for item in list: #查找超标的数据,并记录下 if float(item[5]) > 1: # 排烟浓度大于1则超标 print("该条数据超标") #保存该条记录,提取需要的值,并添加其他字段 exceedingData.append(refind_ex(item)) for i in exceedingData: #遍历列表 print(i) if(len(exceedingData) != 0) : #有超标数据时才执行 #将超标数据时间分类再写abnormal_data异常表中 exception(exceedingData,con) #将超标数据直接写入数据库超标表中 ex_write_to_SQL(exceedingData,con) else: print('该店铺无超标数据') #------------------------------------------------------------------------------------------------------------数据写入设备信息表 def generate_short_uuid(): arrayOf=[ "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z", "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z" ] list=[] ui=str(uuid.uuid4()).replace('-', '') for i in range(0,16): a1=ui[i*2:i*2+2] x=int(a1,16) list.append(arrayOf[x % 0x3E]) return ''.join(list) #返回重组后的列表 def refind_ea(list): #一条记录,也就是一个列表 temp=[] temp.append(generate_short_uuid()) temp.append(list[2]) temp.append(list[1]) temp.append(list[0]) temp.append(1) print(temp) return temp #将列表写入设备信息设备信息ea_t_dev表中 def ea_write_to_SQL(list,con): data = pd.DataFrame(list,columns=['DI_GUID','DI_Code','DI_Name','DI_Supplier','DI_Online']) print("\n\n") print('写入数据表 ,DateFrame为:',data) # test3 要写入的数据表,这样写的话要提前在数据库建好表 data.to_sql(name="ea_t_device_info", con=con, if_exists="append",index=False,index_label=False) print("设备信息表写入完成!") def dev_info_data_if_exisitd(list,con): #list为爬取某家店铺指定页数转换后的数据 global con_read #创建第二个数据库连接 # engine = create_engine("mysql+pymysql://root:1234@localhost:3306/fume?charset=utf8") # con_read = engine.connect() df = pd.read_sql('SELECT DI_Code,DI_Name,DI_Supplier FROM ea_t_device_info',con=con_read) #从设备信息表中读取设备编号,店铺名,供应商字段的数据。返回值是DateFrame类型 # con_read.close() #关闭链接 res = df.values.tolist() #DateFrame按照行转成list类型,res存放的是设备信息表中的数据 print('******** 设备信息******') for i in res: print(i) print('设备信息表记录条数为:',len(res)) list1 = rdm.remove_duplicates_dev_info(list) #设备编号,店铺名,供应商相等时,则为重复,去除。list1为去重后的 if len(res) > 0 : #设备表中有数据 #比较 temp=list1[:] #将list1数据给temp,遍历temp,若相等,从list中删除数据,避免一个列表同时遍历且删除 print('去除重复为:') print(list1) for item in temp: if item[1:4] in ( x[:] for x in res ) : #待存入数据库的值与设备表中数据相等时,将待存入的值从list中移除 list1=rdm.remove_given_data_dev_info(list1,item[1:4]) #该item从list1中移除 print('设备信息表中有数据时,去重后的list为:',list1) if( len(list1) != 0 ) : #删除后不为空时,写入 ea_write_to_SQL(list1,con) #将列表写入ea_t_dev表中 else : #设备表中无数据 # a=rdm.remove_duplicates_dev_info(list) #设备编号,店铺名,供应商相等时,则为重复,去除 print('设备表无数据,处理后待写入的设备信息为:',list1) #将去重后数据写入设备信息表 ea_write_to_SQL(list1,con) #将列表写入设备表中 。 第一个参数:设备编号,店铺名,供应商相等时,则为重复,去除 #将原始数据转化成新的列表,再写入设备信息设备信息表中 /存入 def ea_t_dev(list,con): #某家店铺的制定页的数据记录 ,list列表元素依然为列表,比如[[1,2,3,'a'],[52,3,'a'],[6,2,3,'a']] ,con为数据库的建立 staging=[] #表示转换后的列表 for item in list: #提取需要的值,并添加其他字段 staging.append(refind_ea(item)) #转化 print('设备数据转化后:') for i in staging: print(i) #查询设备表已存的数据,若已存在设备信息,则不写入 dev_info_data_if_exisitd(staging,con) #----------------------------------写入分钟数据表 #返回重组后的列表 def refind_fd(list): #一条记录,也就是一个列表 temp=[] temp.append(list[2]) #设备编号 temp.append(list[12]) #上报时间 temp.append(list[11]) #归属时间 temp.append(list[6]) #风机电流 6 temp.append(list[7]) #净化器电流 7 temp.append(list[4]) #进油烟浓度值 temp.append(list[5]) #排油烟浓度值 print(temp) return temp #将列表写入分钟数据表中 def fd_write_to_SQL(list,con): data = pd.DataFrame(list,columns=['MV_Stat_Code','MV_Create_Time','MV_Data_Time','MV_Fan_Electricity','MV_Purifier_Electricity','MV_Fume_Concentration','MV_Fume_Concentration2']) print("写入分数数据表,DateFrame为:") print(data) # test3 要写入的数据表,这样写的话要提前在数据库建好表 data.to_sql(name="fd_t_minutevalue", con=con, if_exists="append",index=False,index_label=False) print("分钟数据表写入完成!") #转化 再写入fd_t_minbute表中 def fd_t_minbute(list,con): #一页的数据记录 ,con为数据库的建立 staging=[] #保存转换后的列表 for item in list: #提取需要的值,并添加其他字段 staging.append(refind_fd(item)) print('分钟数据转化后:') for i in staging: print(i) fd_write_to_SQL(staging,con) #将列表写入ea_t_dec表中 #--------------------------------------------------------------------------------------------------------------食其家 def get_OnePage_teshu_shiqijia(url,count): global ck global list_temp #使用全局变量 list_temp.clear() #清空临时表 # session.headers = { # # 此处注意cookie,要自己抓取 # # "Cookie":ck, # "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36", # } r = session.get(url, verify=False).text soup = bs(r,'html.parser') list=[] #创建列表来保存结果 tags = soup.find_all("tr") # 列表所有行 for tag in tags: # 每个tag是一行 count=count+1 element = tag.text # 获取