In one of my big assignments for my practical training in my junior year 2020, I have put together a data collection and visualization and analysis system for the New Crown Pneumonia outbreak, which roughly means that I first find the data, then import the data into hive, then clean the data using hive, and then import the cleaned data into MySql using hql, and then after that, I just develop the back-end data interfaces using ssm, and then the front-end uses echarts and tables to visualize the data. Specifically you can view:https:///article/. Since the main requirement at that time was to use hive to process the data, but at that time the data was obtained from some big guy's data interface, it was indeed a big deal to end up using hive to process and then import into the database. So just in the processing of data is not quite right, other processing of data and data visualization is still done well.
This time, a small friend also wants to do an epidemic data collection and visualization system, and wants to learn from what I did before, and let me point out. Then the problem comes: before the data is relatively small, directly from the free interface provided by the Internet can be obtained directly, and now the epidemic has been more than two years, if you want to organize the history of the provinces, cities, every day of the data, the data will be relatively large, and then want to find a ready-made interface that meets the function of the interface is almost nothing, so I did the following work to obtain data and process data:
1. Access to data
The source of the data was using this project on GitHub that I've had bookmarked for a long time:/nCoV/
Data Warehouse Links:/BlankerL/DXY-COVID-19-Data/releases
This additionally deploys a data warehouse, and at 0:00 every day, the program will execute on time and the data will be pushed into Release.
We will be able to directly download from the big brother of that data warehouse ready-made crawler crawling data, data directly download csv format is good, easy to use for doing processing.
After downloading and opening, you will find that there are nearly 100W data in this 92MB file. If you read it directly, it will be a bit slow.
So this is when I thought I could try to read the data in chunks using python's pandas, a tool that is very easy to work with and thieves fast at reading data.
2. Read csv using python
Read csv choose to use the pandas module, using the native read is very, very slow!
Note: Py script files and csv files are placed in the same directory
import pandas as pd import numpy as np # Files read filePath = "" # Getting data def read_csv_feature(filePath): # Read the file f = open(filePath, encoding='utf-8') reader = pd.read_csv(f, sep=',', iterator=True) loop = True chunkSize = 1000000 chunks = [] while loop: try: chunk = reader.get_chunk(chunkSize) (chunk) except StopIteration: loop = False df = (chunks, axis=0, ignore_index=True) () return df data = read_csv_feature(filePath) print('Data read successfully ---------------')
After the csv data is read successfully, it all exists in data, which is a dataset.
You can use the numpy module tool to filter the data set, exported into a list, easy to operate on the data
countryName = (data["countryName"]) countryEnglishName = (data["countryEnglishName"]) provinceName = (data["provinceName"]) province_confirmedCount = (data["province_confirmedCount"]) province_curedCount = (data["province_curedCount"]) province_deadCount = (data["province_deadCount"]) updateTime = (data["updateTime"]) cityName = (data["cityName"]) city_confirmedCount = (data["city_confirmedCount"]) city_curedCount = (data["city_curedCount"]) city_deadCount = (data["city_deadCount"])
This filters out all the data that needs to be used.
3. Data cleansing using pyhon
I'm still using the dumb method for cleaning here, which is pretty straightforward and violent, loading the data into the corresponding lists:
# National historical data historyed = list() # Latest national figures totaled = list() # province latest data provinceed = list() # area latest data areaed = list() for i in range(len(data)): if(countryName[i] == "China."): updatetimeList = str(updateTime[i]).split(' ') updatetime = updatetimeList[0] # Handle historyed historyed_temp = list() if(provinceName[i] == "China."): # Processed totaled if(len(totaled) == 0): (str(updateTime[i])) (int(province_confirmedCount[i])) (int(province_curedCount[i])) (int(province_deadCount[i])) if((len(historyed) > 0) and (str(updatetime) != historyed[len(historyed) - 1][0])): historyed_temp.append(str(updatetime)) historyed_temp.append(int(province_confirmedCount[i])) historyed_temp.append(int(province_curedCount[i])) historyed_temp.append(int(province_deadCount[i])) if(len(historyed) == 0): historyed_temp.append(str(updatetime)) historyed_temp.append(int(province_confirmedCount[i])) historyed_temp.append(int(province_curedCount[i])) historyed_temp.append(int(province_deadCount[i])) if(len(historyed_temp) > 0): (historyed_temp) # Processed areaed areaed_temp = list() if(provinceName[i] != "China."): if(provinceName[i] != "Inner * Autonomous Region" and provinceName[i] != "Heilongjiang Province"): provinceName[i] = provinceName[i][0:2] else: provinceName[i] = provinceName[i][0:3] flag = 1 for item in areaed: if(item[1] == str(cityName[i])): flag = 0 if(flag == 1): areaed_temp.append(str(provinceName[i])) areaed_temp.append(str(cityName[i])) areaed_temp.append(int(0 if (city_confirmedCount[i]) else city_confirmedCount[i])) areaed_temp.append(int(0 if (city_curedCount[i]) else city_curedCount[i])) areaed_temp.append(int(0 if (city_deadCount[i]) else city_deadCount[i])) (areaed_temp) flag = 1 for item in areaed_tmp: if(item[0] == str(provinceName[i])): flag = 0 if(flag == 1): areaed_temp.append(str(provinceName[i])) areaed_temp.append(str(cityName[i])) areaed_temp.append(int(0 if (city_confirmedCount[i]) else city_confirmedCount[i])) areaed_temp.append(int(0 if (city_curedCount[i]) else city_curedCount[i])) areaed_temp.append(int(0 if (city_deadCount[i]) else city_deadCount[i])) areaed_tmp.append(areaed_temp) # Provinceed processed (need to get based on areaed) province_temp = list() for temp in areaed_tmp: if(len(provinceed) == 0 and len(province_temp) == 0): province_temp.append(temp[0]) province_temp.append(temp[2]) province_temp.append(temp[3]) province_temp.append(temp[4]) else: if(temp[0] == province_temp[0]): province_temp[1] = province_temp[1] + temp[2] province_temp[1] = province_temp[2] + temp[3] province_temp[1] = province_temp[3] + temp[4] else: (province_temp) province_temp = list() province_temp.append(temp[0]) province_temp.append(temp[2]) province_temp.append(temp[3]) province_temp.append(temp[4]) (province_temp) print('Data cleansing successful ---------------')
Here there is nothing to say, completely physical work, the above screening out the data for cleaning, need to pay attention to is to carefully observe the data format of the data read out, some of the data format is not very standard, you need to manually deal with.
4. Automatic import of cleaned data into MySql
Importing data into Mysql is still done in python, using the python pymysql module.
import pymysql """ Importing data into a database """ # Open a database connection db=(host="localhost",user="root",password="123456",database="yq") # Use the cursor() method to create a cursor object cursor cursor = () #create yq database ('CREATE DATABASE IF NOT EXISTS yq DEFAULT CHARSET utf8 COLLATE utf8_general_ci;') print('Created yq database successfully') #Create related table tables ('drop table if exists areaed') sql=""" CREATE TABLE IF NOT EXISTS `areaed` ( `provinceName` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `cityName` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `confirmedCount` int(11) NULL DEFAULT NULL, `deadCount` int(11) NULL DEFAULT NULL, `curedCount` int(11) NULL DEFAULT NULL, `currentCount` int(11) NULL DEFAULT NULL ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact; """ (sql) ('drop table if exists provinceed') sql=""" CREATE TABLE `provinceed` ( `provinceName` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `confirmedNum` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `deathsNum` int(11) NULL DEFAULT NULL, `curesNum` int(11) NULL DEFAULT NULL, `currentNum` int(11) NULL DEFAULT NULL ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact; """ (sql) ('drop table if exists totaled') sql=""" CREATE TABLE `totaled` ( `date` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `diagnosed` int(11) NULL DEFAULT NULL, `death` int(11) NULL DEFAULT NULL, `cured` int(11) NULL DEFAULT NULL, `current` int(11) NULL DEFAULT NULL ) ENGINE = MyISAM CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic; """ (sql) ('drop table if exists historyed') sql=""" CREATE TABLE `historyed` ( `date` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `confirmedNum` int(11) NULL DEFAULT NULL, `deathsNum` int(11) NULL DEFAULT NULL, `curesNum` int(11) NULL DEFAULT NULL, `currentNum` int(11) NULL DEFAULT NULL ) ENGINE = MyISAM CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic; """ (sql) print('Created related table successfully') # importedhistoryed for item in historyed: sql='INSERT INTO historyed VALUES(%s,"%s","%s","%s","%s")' try: (sql,(str(item[0]),item[1],item[3],item[2],item[1]-item[2]-item[3])) () except Exception as ex: print("error:") print("The following exception %s occurred"%ex) () break print("Importing historyed success -------------") # Imported areaed for item in areaed: sql='INSERT INTO areaed VALUES(%s,"%s","%s","%s","%s","%s")' try: (sql,(item[0],item[1],item[2],item[4],item[3],item[2]-item[3]-item[4])) () except Exception as ex: print("error:") print("The following exception %s occurred"%ex) () break print("Importing AREAED Success -------------") # Import provinceed for item in provinceed: sql='INSERT INTO provinceed VALUES(%s,"%s","%s","%s","%s")' try: (sql,(str(item[0]),item[1],item[3],item[2],item[1]-item[2]-item[3])) () except Exception as ex: print("error:") print("The following exception %s occurred"%ex) () break print("Importing Provinced Success -------------") # Imported totaled sql='INSERT INTO totaled VALUES(%s,"%s","%s","%s","%s")' try: (sql,(str(totaled[0]),totaled[1],totaled[3],totaled[2],totaled[1]-totaled[2]-totaled[3])) () except Exception as ex: print("error:") print("The following exception %s occurred"%ex) () print("Importing Totaled Success -------------") ()# Close the cursor first ()# Close the database connection again
Here for the ease of use of the script, the first to build a library, then build a table, and finally import the cleaned data into MySql
Full Code
import pandas as pd import numpy as np import pymysql """ @ProjectName: cleanData @FileName: @Author: tao @Date: 2022/05/03 """ # Files read filePath = "" # National historical data historyed = list() # Latest national figures totaled = list() # province latest data provinceed = list() # area latest data areaed = list() # Getting data def read_csv_feature(filePath): # Read the file f = open(filePath, encoding='utf-8') reader = pd.read_csv(f, sep=',', iterator=True) loop = True chunkSize = 1000000 chunks = [] while loop: try: chunk = reader.get_chunk(chunkSize) (chunk) except StopIteration: loop = False df = (chunks, axis=0, ignore_index=True) () return df data = read_csv_feature(filePath) print('Data read successfully ---------------') areaed_tmp = list() countryName = (data["countryName"]) countryEnglishName = (data["countryEnglishName"]) provinceName = (data["provinceName"]) province_confirmedCount = (data["province_confirmedCount"]) province_curedCount = (data["province_curedCount"]) province_deadCount = (data["province_deadCount"]) updateTime = (data["updateTime"]) cityName = (data["cityName"]) city_confirmedCount = (data["city_confirmedCount"]) city_curedCount = (data["city_curedCount"]) city_deadCount = (data["city_deadCount"]) for i in range(len(data)): if(countryName[i] == "China."): updatetimeList = str(updateTime[i]).split(' ') updatetime = updatetimeList[0] # Handle historyed historyed_temp = list() if(provinceName[i] == "China."): # Processed totaled if(len(totaled) == 0): (str(updateTime[i])) (int(province_confirmedCount[i])) (int(province_curedCount[i])) (int(province_deadCount[i])) if((len(historyed) > 0) and (str(updatetime) != historyed[len(historyed) - 1][0])): historyed_temp.append(str(updatetime)) historyed_temp.append(int(province_confirmedCount[i])) historyed_temp.append(int(province_curedCount[i])) historyed_temp.append(int(province_deadCount[i])) if(len(historyed) == 0): historyed_temp.append(str(updatetime)) historyed_temp.append(int(province_confirmedCount[i])) historyed_temp.append(int(province_curedCount[i])) historyed_temp.append(int(province_deadCount[i])) if(len(historyed_temp) > 0): (historyed_temp) # Processed areaed areaed_temp = list() if(provinceName[i] != "China."): if(provinceName[i] != "Inner * Autonomous Region" and provinceName[i] != "Heilongjiang Province"): provinceName[i] = provinceName[i][0:2] else: provinceName[i] = provinceName[i][0:3] flag = 1 for item in areaed: if(item[1] == str(cityName[i])): flag = 0 if(flag == 1): areaed_temp.append(str(provinceName[i])) areaed_temp.append(str(cityName[i])) areaed_temp.append(int(0 if (city_confirmedCount[i]) else city_confirmedCount[i])) areaed_temp.append(int(0 if (city_curedCount[i]) else city_curedCount[i])) areaed_temp.append(int(0 if (city_deadCount[i]) else city_deadCount[i])) (areaed_temp) flag = 1 for item in areaed_tmp: if(item[0] == str(provinceName[i])): flag = 0 if(flag == 1): areaed_temp.append(str(provinceName[i])) areaed_temp.append(str(cityName[i])) areaed_temp.append(int(0 if (city_confirmedCount[i]) else city_confirmedCount[i])) areaed_temp.append(int(0 if (city_curedCount[i]) else city_curedCount[i])) areaed_temp.append(int(0 if (city_deadCount[i]) else city_deadCount[i])) areaed_tmp.append(areaed_temp) # Provinceed processed (need to get based on areaed) province_temp = list() for temp in areaed_tmp: if(len(provinceed) == 0 and len(province_temp) == 0): province_temp.append(temp[0]) province_temp.append(temp[2]) province_temp.append(temp[3]) province_temp.append(temp[4]) else: if(temp[0] == province_temp[0]): province_temp[1] = province_temp[1] + temp[2] province_temp[1] = province_temp[2] + temp[3] province_temp[1] = province_temp[3] + temp[4] else: (province_temp) province_temp = list() province_temp.append(temp[0]) province_temp.append(temp[2]) province_temp.append(temp[3]) province_temp.append(temp[4]) (province_temp) print('Data cleansing successful ---------------') # print(historyed) # print(areaed) print(totaled) # print(provinceed) """ print(len(provinceed)) for item in provinceed: print(item[1]-item[2]-item[3]) """ """ Importing data into a database """ # Open a database connection db=(host="localhost",user="root",password="123456",database="yq") # Use the cursor() method to create a cursor object cursor cursor = () #create yq database ('CREATE DATABASE IF NOT EXISTS yq DEFAULT CHARSET utf8 COLLATE utf8_general_ci;') print('Created yq database successfully') #Create related table tables ('drop table if exists areaed') sql=""" CREATE TABLE IF NOT EXISTS `areaed` ( `provinceName` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `cityName` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `confirmedCount` int(11) NULL DEFAULT NULL, `deadCount` int(11) NULL DEFAULT NULL, `curedCount` int(11) NULL DEFAULT NULL, `currentCount` int(11) NULL DEFAULT NULL ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact; """ (sql) ('drop table if exists provinceed') sql=""" CREATE TABLE `provinceed` ( `provinceName` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `confirmedNum` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `deathsNum` int(11) NULL DEFAULT NULL, `curesNum` int(11) NULL DEFAULT NULL, `currentNum` int(11) NULL DEFAULT NULL ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact; """ (sql) ('drop table if exists totaled') sql=""" CREATE TABLE `totaled` ( `date` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `diagnosed` int(11) NULL DEFAULT NULL, `death` int(11) NULL DEFAULT NULL, `cured` int(11) NULL DEFAULT NULL, `current` int(11) NULL DEFAULT NULL ) ENGINE = MyISAM CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic; """ (sql) ('drop table if exists historyed') sql=""" CREATE TABLE `historyed` ( `date` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `confirmedNum` int(11) NULL DEFAULT NULL, `deathsNum` int(11) NULL DEFAULT NULL, `curesNum` int(11) NULL DEFAULT NULL, `currentNum` int(11) NULL DEFAULT NULL ) ENGINE = MyISAM CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic; """ (sql) print('Created related table successfully') # importedhistoryed for item in historyed: sql='INSERT INTO historyed VALUES(%s,"%s","%s","%s","%s")' try: (sql,(str(item[0]),item[1],item[3],item[2],item[1]-item[2]-item[3])) () except Exception as ex: print("error:") print("The following exception %s occurred"%ex) () break print("Importing historyed success -------------") # Imported areaed for item in areaed: sql='INSERT INTO areaed VALUES(%s,"%s","%s","%s","%s","%s")' try: (sql,(item[0],item[1],item[2],item[4],item[3],item[2]-item[3]-item[4])) () except Exception as ex: print("error:") print("The following exception %s occurred"%ex) () break print("Importing AREAED Success -------------") # Import provinceed for item in provinceed: sql='INSERT INTO provinceed VALUES(%s,"%s","%s","%s","%s")' try: (sql,(str(item[0]),item[1],item[3],item[2],item[1]-item[2]-item[3])) () except Exception as ex: print("error:") print("The following exception %s occurred"%ex) () break print("Importing Provinced Success -------------") # Imported totaled sql='INSERT INTO totaled VALUES(%s,"%s","%s","%s","%s")' try: (sql,(str(totaled[0]),totaled[1],totaled[3],totaled[2],totaled[1]-totaled[2]-totaled[3])) () except Exception as ex: print("error:") print("The following exception %s occurred"%ex) () print("Importing Totaled Success -------------") ()# Close the cursor first ()# Close the database connection again
Script running effect
The database can see the following tables and data
Finally our data will have been available, at this time the format of data processing is still referring to my previous whole of the new crown pneumonia outbreak of data collection and visualization and analysis of the system docking, the collective background and visualization of the implementation can be referred to:/?p=514
To this article on the python cleaning epidemic historical data is introduced to this article, more related python epidemic historical data content please search for my previous articles or continue to browse the following related articles I hope you will support me more in the future!