SoFunction
Updated on 2024-11-17

Python solution to implement a real-time incremental data loading tool

This time, we mainly share the solution to implement a real-time incremental data loading tool in conjunction with a real-world use case of the single-case pattern. The key thing is to realize a table of incremental ID records that can be added, modified, deleted and other operations.

Singleton Pattern: Provides a global access point to ensure that the class has one and only one object of a particular type. Typically used in the following scenarios: logging or database operations, etc., to avoid conflicting requests for a single resource.

Creating an Incremental ID Record Table

import sqlite3
import datetime
import pymssql
import pandas as pd
import time
pd.set_option('expand_frame_repr', False)

Import the required modules

 # Create data tables
database_path = r'.\Database\ID_Record.db'
from sqlite3 import connect

with connect(database_path) as conn:
    (
        'CREATE TABLE IF NOT EXISTS Incremental_data_max_id_record(id INTEGER PRIMARY KEY AUTOINCREMENT,F_SDaqID_MAX TEXT,record_date datetime)')

Incremental Latest Record ID-F_SDaqID_MAX Database Storage

#Data saved to local txt
def text_save(filename, record):#filename is the path to write the txt file, record is the list of data to be written to F_SDaqID_MAX, record_date.
    file = open(filename,'a') Additional methods
    # file = open(filename, 'w') # override method
    for i in range(len(record)):
        s = str(record[i]).replace('[','').replace(']','')
        s = ("'",'').replace(',','') +'\n'   # Remove single quotes, commas, append newlines at the end of each line
        (s)
    ()

Incremental Latest Record ID-F_SDaqID_MAX Temporary File Storage

The incremental ID record provides two implementation options, one is data persistence storage mode, the other is temporary file storage mode. Data persistence mode, as the name suggests, that is, when creating an object, you can record the operation key information such as incremental ID-F_SDaqID_MAX, this flag record mapping is often chosen design mode.

database connection class

Implementing real-time incremental data acquisition requires the implementation of two database connection classes: the incremental data ID storage class and the incremental target data source class. Here, the database operation class is implemented using the single-case pattern to store the incremental service record information into the database or a specific log file in a sequential manner to maintain data consistency.

1, incremental data ID storage sqlite connection class code

class Database_sqlite(metaclass=MetaSingleton):
    database_path = r'.\Database\energy_rc_configure.db'
    connection = None
    def connect(self):
        if  is None:
             = (self.database_path,check_same_thread=False,isolation_level=None)
             =  ()
        return ,

    # Maximum records inserted
    @staticmethod
    def Insert_Max_ID_Record(f1, f2):

        cursor = Database_sqlite().connect()
        print(cursor)

        sql = f"""insert into Incremental_data_max_id_record(F_SDaqID_MAX,record_date) values("{f1}","{f2}")"""
        cursor[0].execute(sql)

        # sql = "insert  into Incremental_data_max_id_record(F_SDaqID_MAX,record_date) values(?,?)"
        # cursor[0].execute(sql,(f"{f1}",f"{f2}"))

        cursor[1].commit()
        print("Insertion successful!")
        # cursor[0].close()
        return 

    # Fetch the latest primary ID record in the incremental database
    @staticmethod
    def View_Max_ID_Records():

        cursor = Database_sqlite().connect()
        sql = "select max(F_SDaqID_MAX) from Incremental_data_max_id_record"
        cursor[0].execute(sql)
        results = cursor[0].fetchone()[0]
        # # The singleton pattern doesn't have to close the database connection
        # cursor[0].close()
        print("Latest Record ID", results)
        return results

    #Delete Data Record ID
    @staticmethod
    def Del_Max_ID_Records():
        cursor = Database_sqlite().connect()
        sql = "delete from Incremental_data_max_id_record where record_date = (select MAX(record_date) from Incremental_data_max_id_record)"
        cursor[0].execute(sql)
        # results = cursor[0].fetchone()[0]
        # # cursor[0].close()
        cursor[1].commit()
        print("Deleted successfully.")
        return

2, incremental data source sqlserver connection class code

class Database_sqlserver(metaclass=MetaSingleton):
    """
    # Real-time database
    """
    connection = None

    # def connect(self):
    def __init__(self):
        if  is None:
             = (host="xxxxx",user="xxxxx",password="xxxxx",database="xxxxx",charset="utf8")
            if :
                print("Connection successful!")
            # Open a database connection
             = ()
        # return , 

    # Get the largest ID in the data source
    @staticmethod
    def get_F_SDaqID_MAX():
        # cursor_insert = Database_sqlserver().connect()
        cursor_insert = Database_sqlserver().cursorobj

        sql_MAXID = """select MAX(F_SDaqID) from T_DaqDataForEnergy"""

        cursor_insert.execute(sql_MAXID)  # Execute a query statement to select all the data in the table

        F_SDaqID_MAX = cursor_insert.fetchone()[0]  # Get the record

        print("greatestID(be) worth:{0}".format(F_SDaqID_MAX))

        return F_SDaqID_MAX

    # Extract incremental data
    @staticmethod
    def get_incremental_data(incremental_Max_ID):
        # Start getting incremental data
        sql_incremental_data = """select F_ID,F_Datetime,F_Data from T_DaqDataForEnergy  where F_ID > {0}""".format(
            incremental_Max_ID)

        # cursor_find = Database_sqlserver().connect()
        cursor_find = Database_sqlserver().cursorobj

        cursor_find.execute(sql_incremental_data)  # Execute a query statement to select all the data in the table

        Target_data_source = cursor_find.fetchall()  # Access to all data records

        # cursor_find.close()
        cursor_find.close()

        df = (
            Target_data_source,
            columns=[
                "F_ID",
                "F_Datetime",
                "F_Data"])

        print("Extract data", df)
        return df

The data resource application service design mainly considers the consistency of database operations and optimization of various database operations to improve memory or CPU utilization.

Realize a variety of read and write operations, the client operation calls the API to perform the corresponding DB operations.

Notes:

1, the use of metaclass implementation to create a class with the characteristics of a single case

Database_sqlserver(metaclass=MetaSingleton)

Database_sqlite(metaclass=MetaSingleton)

When defining a new class using class, the database class Database_sqlserver is decorated by MetaSingleton, i.e., the metaclass is specified, then the MetaSingleton's special method, the __call__ method, will be executed automatically.

class MetaSingleton(type):
    _instances={}
    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(MetaSingleton,cls).__call__(*args,**kwargs)
        return cls._instances[cls]

The above code is based on a singleton implementation of the metaclass, which instantiates the database class multiple times when the client performs certain operations on the database, but only creates a single object, so the calls to the database are synchronized.

2, multi-threaded use of the same database connection resources need to take certain synchronization mechanism

If the synchronization mechanism is not used, some unintended consequences may occur

1) WITH locking

class MetaSingleton(type):
    _instances={}
    lock = ()
    def __call__(cls, *args, **kwargs):
        with :
            if cls not in cls._instances:
                (0.05)  # Simulation time consuming
                cls._instances[cls] = super(MetaSingleton,cls).__call__(*args,**kwargs)
            return cls._instances[cls]

Locks consume resources to create and release, and the above code must acquire a lock each time it is created.

3, if we develop the program is not a single application, but clustered, that is, multiple clients share a single database, resulting in database operations can not be synchronized, and database connection pooling is a better choice. Significantly save memory, improve the efficiency of server services, and can support more client services.

The solution for database connection pooling is to establish enough database connections at application startup and form a connection pool of these connections, where the application dynamically applies, uses and releases the connections in the pool. Concurrent requests for more than the number of connections in the pool should be queued in the request queue.

Incremental Data Service Client

Incremental processing strategy: the first load first determine whether the latest record exists in the incremental data table, if so, load it directly; otherwise, record the largest/latest data record ID or point in time and save it to an incremental database or record file.

Starting with the second load only the data after the largest/latest ID or point in time is loaded. When the loading process is all successfully completed and synchronized to update the incremental database or record file, update the last record ID or point in time for this data record.

Generally this type of data record table has self-growing columns, then you can also use self-growing columns to implement this identification feature. For example, this time I used data table growth column F_ID.

class IncrementalRecordServer:
    _servers = []
    _instance = None
    def __new__(cls, *args, **kwargs):
        if not IncrementalRecordServer._instance:
            # IncrementalRecordServer._instance = super().__new__(cls)
            IncrementalRecordServer._instance = super(IncrementalRecordServer,cls).__new__(cls)
        return IncrementalRecordServer._instance

    def __init__(self,changeServersID=None):

        """
        Variable initialization process
        """
        self.F_SDaqID_MAX = Database_sqlserver().get_F_SDaqID_MAX()
        self.record_date = ().strftime('%Y-%m-%d %H:%M:%S')
         = changeServersID

    # Callbacks to update local records, clear record replacement, temporary records
    def record(func):
        def Server_record(self):
            v = func(self)
            text_save(filename=r"F:\AutoOps_platform\Database\Server_record.txt",record=IncrementalRecordServer._servers)
            print("Saved successfully.")

            return v
        return Server_record

    # Increase in service records
    @record
    def addServer(self):
        self._servers.append([int(self.F_SDaqID_MAX),self.record_date])
        print("Add record")
        Database_sqlite.Insert_Max_ID_Record(f1=self.F_SDaqID_MAX, f2=self.record_date)

    # Modify service records
    @record
    def changeServers(self):
        # self._servers.pop()
        # Pass in the manually modified record ID here
        self._servers.append([,self.record_date])
        # Delete and then insert to implement changes
        Database_sqlite.Del_Max_ID_Records()
        Database_sqlite.Insert_Max_ID_Record(f1=, f2=self.record_date)
        print("Updating records")

    # Delete service records
    @record
    def popServers(self):
        # self._servers.pop()
        print("Deletion of records")
        Database_sqlite.Del_Max_ID_Records()

    # Latest service records
    def getServers(self):
        # print(self._servers[-1])
        Max_ID_Records = Database_sqlite.View_Max_ID_Records()
        print("View Record",Max_ID_Records)
        return Max_ID_Records

    # Extract data
    def Incremental_data_client(self):
        """
        # Extract data (incremental data MAXID acquisition and extraction of incremental data)
        """
        # Real-time database
        # The first load determines if the latest record exists.
        if () == None:
            # Insert incremental database ID
            ()
            # Extract incremental data
            data = Database_sqlserver.get_incremental_data(self.F_SDaqID_MAX)
            return data

        # Get the latest max ID record that already exists in the incremental database
        incremental_Max_ID = ()

        # Add records
        ()
        # Extract incremental data
        Target_data_source = Database_sqlserver.get_incremental_data(incremental_Max_ID)

        return Target_data_source

Optimization Strategies:

1. Delayed loading method

The above incremental record service class IncrementalRecordServer controls the creation of objects by overriding the __new__ method, where we check if the object exists first. This can also be achieved by lazy loading, saving resources optimized as follows.

class IncrementalRecordServer:
    _servers = []
    _instance = None

    def __init__(self,changeServersID=None):
        """
        Variable initialization process
        """
        self.F_SDaqID_MAX = Database_sqlserver().get_F_SDaqID_MAX()
        self.record_date = ().strftime('%Y-%m-%d %H:%M:%S')
         = changeServersID

        if not IncrementalRecordServer._instance:
            print("__init__ object creation.")
        else:
            print("Object already exists:",IncrementalRecordServer._instance)
            ()

    @classmethod
    def getInstance(cls):
        if not cls._instance:
            cls._instance = IncrementalRecordServer()
        return cls._instance

Lazy instantiation ensures that objects are created only when they are actually needed; instantiating a= IncrementalRecordServer() calls the initialization __init__ method, but no new objects are created. Lazy loading of class objects in this way is also known as delayed loading.

2. The single instance model can effectively utilize space resources, utilizing the same space resources each time.

Different operating objects have the same memory address, and different object initialization will be the last object initialization variable override, to ensure that the latest records are updated in real time. On the surface of the above code to realize the singleton pattern is not a problem, but multi-threaded concurrency, there are thread safety issues, may be created at the same time different object space. Taking into account the thread security, you can also further lock processing.

3. Scope of application and precautions

This code applies to the deployment of the production of specified points in time after the output of the incremental data, a long time is not enabled to restart the need to clear the history of the incremental database or file ID needs to be cleared, the general real-time data incremental realization of a load there is no problem, so this is not to be very concerned about (the file mode code can be perfected by yourself); when loading the history of the database or the timed intervals to produce a data volume is too large, it is necessary to further modify the code, you need to determine the size of the data, specify the starting node and the amount of data loaded, the comprehensive factors to consider, the next time to share a billion data volume extraction program.

4, further understanding of Python garbage collection mechanism; concurrency, by optimizing the thread pool to manage resources.

Finally, you can add a function to free up resources

def __del__(self):
    class_name = self.__class__.__name__
    print(class_name,"Destroy")

del obj calls __del__() to destroy the object, freeing its space; only Python objects are freed when they are no longer referenced. The __del__() method does not execute immediately when another variable in the program references the instance object, even if you call it manually. This has to do with Python's implementation of garbage collection.

Results testing

if __name__ == '__main__':
    for i in range(6):
        hc1 = IncrementalRecordServer()
        ()
        print("Record_ID",hc1._servers[i])
        # del hc1
        (60)

    #Server2-client client
    # Latest service records
    hc2 = IncrementalRecordServer()
    ()
    #View incremental data
    hc2.Incremental_data_client()

Insert Record

Simulate insertion of one record every 1 minute, inserting 7 into the incremental database

if __name__ == '__main__':
    # Server3-client client
    # Manually add incremental start ID records
    hc3 = IncrementalRecordServer(changeServersID='346449980')
    ()

if __name__ == '__main__':
    # Delete ID
    hc3 = IncrementalRecordServer(changeServersID='346449980')
    # ()
    ()

Above is the Python implementation of real-time incremental data loading tool solution in detail, more information about Python incremental data loading please pay attention to my other related articles!