This time, we mainly share the solution to implement a real-time incremental data loading tool in conjunction with a real-world use case of the single-case pattern. The key thing is to realize a table of incremental ID records that can be added, modified, deleted and other operations.
Singleton Pattern: Provides a global access point to ensure that the class has one and only one object of a particular type. Typically used in the following scenarios: logging or database operations, etc., to avoid conflicting requests for a single resource.
Creating an Incremental ID Record Table
import sqlite3 import datetime import pymssql import pandas as pd import time pd.set_option('expand_frame_repr', False)
Import the required modules
# Create data tables database_path = r'.\Database\ID_Record.db' from sqlite3 import connect with connect(database_path) as conn: ( 'CREATE TABLE IF NOT EXISTS Incremental_data_max_id_record(id INTEGER PRIMARY KEY AUTOINCREMENT,F_SDaqID_MAX TEXT,record_date datetime)')
Incremental Latest Record ID-F_SDaqID_MAX Database Storage
#Data saved to local txt def text_save(filename, record):#filename is the path to write the txt file, record is the list of data to be written to F_SDaqID_MAX, record_date. file = open(filename,'a') Additional methods # file = open(filename, 'w') # override method for i in range(len(record)): s = str(record[i]).replace('[','').replace(']','') s = ("'",'').replace(',','') +'\n' # Remove single quotes, commas, append newlines at the end of each line (s) ()
Incremental Latest Record ID-F_SDaqID_MAX Temporary File Storage
The incremental ID record provides two implementation options, one is data persistence storage mode, the other is temporary file storage mode. Data persistence mode, as the name suggests, that is, when creating an object, you can record the operation key information such as incremental ID-F_SDaqID_MAX, this flag record mapping is often chosen design mode.
database connection class
Implementing real-time incremental data acquisition requires the implementation of two database connection classes: the incremental data ID storage class and the incremental target data source class. Here, the database operation class is implemented using the single-case pattern to store the incremental service record information into the database or a specific log file in a sequential manner to maintain data consistency.
1, incremental data ID storage sqlite connection class code
class Database_sqlite(metaclass=MetaSingleton): database_path = r'.\Database\energy_rc_configure.db' connection = None def connect(self): if is None: = (self.database_path,check_same_thread=False,isolation_level=None) = () return , # Maximum records inserted @staticmethod def Insert_Max_ID_Record(f1, f2): cursor = Database_sqlite().connect() print(cursor) sql = f"""insert into Incremental_data_max_id_record(F_SDaqID_MAX,record_date) values("{f1}","{f2}")""" cursor[0].execute(sql) # sql = "insert into Incremental_data_max_id_record(F_SDaqID_MAX,record_date) values(?,?)" # cursor[0].execute(sql,(f"{f1}",f"{f2}")) cursor[1].commit() print("Insertion successful!") # cursor[0].close() return # Fetch the latest primary ID record in the incremental database @staticmethod def View_Max_ID_Records(): cursor = Database_sqlite().connect() sql = "select max(F_SDaqID_MAX) from Incremental_data_max_id_record" cursor[0].execute(sql) results = cursor[0].fetchone()[0] # # The singleton pattern doesn't have to close the database connection # cursor[0].close() print("Latest Record ID", results) return results #Delete Data Record ID @staticmethod def Del_Max_ID_Records(): cursor = Database_sqlite().connect() sql = "delete from Incremental_data_max_id_record where record_date = (select MAX(record_date) from Incremental_data_max_id_record)" cursor[0].execute(sql) # results = cursor[0].fetchone()[0] # # cursor[0].close() cursor[1].commit() print("Deleted successfully.") return
2, incremental data source sqlserver connection class code
class Database_sqlserver(metaclass=MetaSingleton): """ # Real-time database """ connection = None # def connect(self): def __init__(self): if is None: = (host="xxxxx",user="xxxxx",password="xxxxx",database="xxxxx",charset="utf8") if : print("Connection successful!") # Open a database connection = () # return , # Get the largest ID in the data source @staticmethod def get_F_SDaqID_MAX(): # cursor_insert = Database_sqlserver().connect() cursor_insert = Database_sqlserver().cursorobj sql_MAXID = """select MAX(F_SDaqID) from T_DaqDataForEnergy""" cursor_insert.execute(sql_MAXID) # Execute a query statement to select all the data in the table F_SDaqID_MAX = cursor_insert.fetchone()[0] # Get the record print("greatestID(be) worth:{0}".format(F_SDaqID_MAX)) return F_SDaqID_MAX # Extract incremental data @staticmethod def get_incremental_data(incremental_Max_ID): # Start getting incremental data sql_incremental_data = """select F_ID,F_Datetime,F_Data from T_DaqDataForEnergy where F_ID > {0}""".format( incremental_Max_ID) # cursor_find = Database_sqlserver().connect() cursor_find = Database_sqlserver().cursorobj cursor_find.execute(sql_incremental_data) # Execute a query statement to select all the data in the table Target_data_source = cursor_find.fetchall() # Access to all data records # cursor_find.close() cursor_find.close() df = ( Target_data_source, columns=[ "F_ID", "F_Datetime", "F_Data"]) print("Extract data", df) return df
The data resource application service design mainly considers the consistency of database operations and optimization of various database operations to improve memory or CPU utilization.
Realize a variety of read and write operations, the client operation calls the API to perform the corresponding DB operations.
Notes:
1, the use of metaclass implementation to create a class with the characteristics of a single case
Database_sqlserver(metaclass=MetaSingleton)
Database_sqlite(metaclass=MetaSingleton)
When defining a new class using class, the database class Database_sqlserver is decorated by MetaSingleton, i.e., the metaclass is specified, then the MetaSingleton's special method, the __call__ method, will be executed automatically.
class MetaSingleton(type): _instances={} def __call__(cls, *args, **kwargs): if cls not in cls._instances: cls._instances[cls] = super(MetaSingleton,cls).__call__(*args,**kwargs) return cls._instances[cls]
The above code is based on a singleton implementation of the metaclass, which instantiates the database class multiple times when the client performs certain operations on the database, but only creates a single object, so the calls to the database are synchronized.
2, multi-threaded use of the same database connection resources need to take certain synchronization mechanism
If the synchronization mechanism is not used, some unintended consequences may occur
1) WITH locking
class MetaSingleton(type): _instances={} lock = () def __call__(cls, *args, **kwargs): with : if cls not in cls._instances: (0.05) # Simulation time consuming cls._instances[cls] = super(MetaSingleton,cls).__call__(*args,**kwargs) return cls._instances[cls]
Locks consume resources to create and release, and the above code must acquire a lock each time it is created.
3, if we develop the program is not a single application, but clustered, that is, multiple clients share a single database, resulting in database operations can not be synchronized, and database connection pooling is a better choice. Significantly save memory, improve the efficiency of server services, and can support more client services.
The solution for database connection pooling is to establish enough database connections at application startup and form a connection pool of these connections, where the application dynamically applies, uses and releases the connections in the pool. Concurrent requests for more than the number of connections in the pool should be queued in the request queue.
Incremental Data Service Client
Incremental processing strategy: the first load first determine whether the latest record exists in the incremental data table, if so, load it directly; otherwise, record the largest/latest data record ID or point in time and save it to an incremental database or record file.
Starting with the second load only the data after the largest/latest ID or point in time is loaded. When the loading process is all successfully completed and synchronized to update the incremental database or record file, update the last record ID or point in time for this data record.
Generally this type of data record table has self-growing columns, then you can also use self-growing columns to implement this identification feature. For example, this time I used data table growth column F_ID.
class IncrementalRecordServer: _servers = [] _instance = None def __new__(cls, *args, **kwargs): if not IncrementalRecordServer._instance: # IncrementalRecordServer._instance = super().__new__(cls) IncrementalRecordServer._instance = super(IncrementalRecordServer,cls).__new__(cls) return IncrementalRecordServer._instance def __init__(self,changeServersID=None): """ Variable initialization process """ self.F_SDaqID_MAX = Database_sqlserver().get_F_SDaqID_MAX() self.record_date = ().strftime('%Y-%m-%d %H:%M:%S') = changeServersID # Callbacks to update local records, clear record replacement, temporary records def record(func): def Server_record(self): v = func(self) text_save(filename=r"F:\AutoOps_platform\Database\Server_record.txt",record=IncrementalRecordServer._servers) print("Saved successfully.") return v return Server_record # Increase in service records @record def addServer(self): self._servers.append([int(self.F_SDaqID_MAX),self.record_date]) print("Add record") Database_sqlite.Insert_Max_ID_Record(f1=self.F_SDaqID_MAX, f2=self.record_date) # Modify service records @record def changeServers(self): # self._servers.pop() # Pass in the manually modified record ID here self._servers.append([,self.record_date]) # Delete and then insert to implement changes Database_sqlite.Del_Max_ID_Records() Database_sqlite.Insert_Max_ID_Record(f1=, f2=self.record_date) print("Updating records") # Delete service records @record def popServers(self): # self._servers.pop() print("Deletion of records") Database_sqlite.Del_Max_ID_Records() # Latest service records def getServers(self): # print(self._servers[-1]) Max_ID_Records = Database_sqlite.View_Max_ID_Records() print("View Record",Max_ID_Records) return Max_ID_Records # Extract data def Incremental_data_client(self): """ # Extract data (incremental data MAXID acquisition and extraction of incremental data) """ # Real-time database # The first load determines if the latest record exists. if () == None: # Insert incremental database ID () # Extract incremental data data = Database_sqlserver.get_incremental_data(self.F_SDaqID_MAX) return data # Get the latest max ID record that already exists in the incremental database incremental_Max_ID = () # Add records () # Extract incremental data Target_data_source = Database_sqlserver.get_incremental_data(incremental_Max_ID) return Target_data_source
Optimization Strategies:
1. Delayed loading method
The above incremental record service class IncrementalRecordServer controls the creation of objects by overriding the __new__ method, where we check if the object exists first. This can also be achieved by lazy loading, saving resources optimized as follows.
class IncrementalRecordServer: _servers = [] _instance = None def __init__(self,changeServersID=None): """ Variable initialization process """ self.F_SDaqID_MAX = Database_sqlserver().get_F_SDaqID_MAX() self.record_date = ().strftime('%Y-%m-%d %H:%M:%S') = changeServersID if not IncrementalRecordServer._instance: print("__init__ object creation.") else: print("Object already exists:",IncrementalRecordServer._instance) () @classmethod def getInstance(cls): if not cls._instance: cls._instance = IncrementalRecordServer() return cls._instance
Lazy instantiation ensures that objects are created only when they are actually needed; instantiating a= IncrementalRecordServer() calls the initialization __init__ method, but no new objects are created. Lazy loading of class objects in this way is also known as delayed loading.
2. The single instance model can effectively utilize space resources, utilizing the same space resources each time.
Different operating objects have the same memory address, and different object initialization will be the last object initialization variable override, to ensure that the latest records are updated in real time. On the surface of the above code to realize the singleton pattern is not a problem, but multi-threaded concurrency, there are thread safety issues, may be created at the same time different object space. Taking into account the thread security, you can also further lock processing.
3. Scope of application and precautions
This code applies to the deployment of the production of specified points in time after the output of the incremental data, a long time is not enabled to restart the need to clear the history of the incremental database or file ID needs to be cleared, the general real-time data incremental realization of a load there is no problem, so this is not to be very concerned about (the file mode code can be perfected by yourself); when loading the history of the database or the timed intervals to produce a data volume is too large, it is necessary to further modify the code, you need to determine the size of the data, specify the starting node and the amount of data loaded, the comprehensive factors to consider, the next time to share a billion data volume extraction program.
4, further understanding of Python garbage collection mechanism; concurrency, by optimizing the thread pool to manage resources.
Finally, you can add a function to free up resources
def __del__(self): class_name = self.__class__.__name__ print(class_name,"Destroy")
del obj calls __del__() to destroy the object, freeing its space; only Python objects are freed when they are no longer referenced. The __del__() method does not execute immediately when another variable in the program references the instance object, even if you call it manually. This has to do with Python's implementation of garbage collection.
Results testing
if __name__ == '__main__': for i in range(6): hc1 = IncrementalRecordServer() () print("Record_ID",hc1._servers[i]) # del hc1 (60) #Server2-client client # Latest service records hc2 = IncrementalRecordServer() () #View incremental data hc2.Incremental_data_client()
Insert Record
Simulate insertion of one record every 1 minute, inserting 7 into the incremental database
if __name__ == '__main__': # Server3-client client # Manually add incremental start ID records hc3 = IncrementalRecordServer(changeServersID='346449980') ()
if __name__ == '__main__': # Delete ID hc3 = IncrementalRecordServer(changeServersID='346449980') # () ()
Above is the Python implementation of real-time incremental data loading tool solution in detail, more information about Python incremental data loading please pay attention to my other related articles!