question
For database connections, it is generally not recommended to use global variables, and the connection is closed after each operation is completed. This is because keeping a database connection for a long time negatively affects performance and resource consumption. In contrast, using a database connection pool to maintain and allocate database connections is better practice.
benefit
The advantage of connection pooling is that it can be shared among multiple threads or processes and can effectively manage the number of connections without manually opening and closing the connection.
Commonly used packages
QueuePool in SQLAlchemy and PooledDB in DBUtils are both popular Python database connection pooling implementations, which have similar functionality but have some differences.
QueuePool is a built-in connection pooling implementation in SQLAlchemy. It can manage a connection queue to ensure that each connection is properly closed after use. This pool is implemented using Python's own queue module and supports configurable maximum number of connections, preprocessing statements and other features. The advantages are ease of use, no additional dependencies required, and seamless integration with SQLAlchemy.
PooledDB is a connection pooling implementation provided by the DBUtils library and can be used with SQLAlchemy or other Python database libraries. It supports multiple types of connection pooling and uses threading module to achieve thread safety, with higher performance and stability. The library also provides some convenient features such as automatic recycling of idle connections, etc.
To summarize the above, it is better to use DBUtils
Code
import pymysql from import PooledDB host = 'localhost' port = 3306 user = 'root' password = '123456' database = 'mytest' class MySQLConnectionPool: def __init__(self,): = PooledDB( creator=pymysql, # Use modules that link databases mincached=10, # When initializing, at least the link created in the link pool is created, 0 means that no link is created. maxconnections=200, # The maximum number of connections allowed by the connection pool, 0 and None means no limit on the number of connections blocking=True, # Whether to block and wait after no connection is available in the connection pool. True, wait; False, don't wait and then report an error host=host, port=port, user=user, password=password, database=database ) def open(self): = () = (cursor=) # Indicates that the read data is of dictionary type return , def close(self, cursor, conn): () () def select_one(self, sql, *args): """Query a single piece of data""" conn, cursor = () (sql, args) result = () (conn, cursor) return result def select_all(self, sql, args): """Query multiple data""" conn, cursor = () (sql, args) result = () (conn, cursor) return result def insert_one(self, sql, args): """Insert a single piece of data""" (sql, args, isNeed=True) def insert_all(self, sql, datas): """Insert multiple batch inserts""" conn, cursor = () try: (sql, datas) () return {'result': True, 'id': int()} except Exception as err: () return {'result': False, 'err': err} def update_one(self, sql, args): """Update data""" (sql, args, isNeed=True) def delete_one(self, sql, *args): """Delete Data""" (sql, args, isNeed=True) def execute(self, sql, args, isNeed=False): """ implement :param isNeed Does it need to rollback """ conn, cursor = () if isNeed: try: (sql, args) () except: () else: (sql, args) () (conn, cursor) """ CREATE TABLE `names` ( `id` int(10) NOT NULL AUTO_INCREMENT COMMENT 'Primary Key', `name` VARCHAR(30) DEFAULT NULL COMMENT 'Name', `sex` VARCHAR(20) DEFAULT NULL COMMENT 'gender', `age` int(5) DEFAULT NULL COMMENT 'age', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='Import data into mysql'; """ mysql = MySQLConnectionPool() sql_insert_one = "insert into `names` (`name`, sex, age) values (%s,%s,%s)" mysql.insert_one(sql_insert_one, ('Tang San', 'male', 25)) datas = [ ('Dai Mubai', 'male', 26), ('Oscar', 'male', 26), ('Tang San', 'male', 25), ('Xiaowu', 'female', 100000), ('Ma Hongjun', 'male', 23), ('Ning Rongrong', 'female', 22), ('Zhu Zhuqing', 'female', 21), ] sql_insert_all = "insert into `names` (`name`, sex, age) values (%s,%s,%s)" mysql.insert_all(sql_insert_all, datas) sql_update_one = "update `names` set age=%s where `name`=%s" mysql.update_one(sql_update_one, (28, 'Tang San')) sql_delete_one = 'delete from `names` where `name`=%s ' mysql.delete_one(sql_delete_one, ('Tang San',)) sql_select_one = 'select * from `names` where `name`=%s' results = mysql.select_one(sql_select_one, ('Tang San',)) print(results) sql_select_all = 'select * from `names` where `name`=%s' results = mysql.select_all(sql_select_all, ('Tang San',)) print(results)
Python uses connection pooling to manipulate MySQL
Test environment description: Python version is 3.8.10, DBUtils version is 3.1.0, pymysql version is 1.0.3
First install the specified version of the connection pool library DBUtils and pymysql
pip install DBUtils==3.1.0 pip install pymysql==1.0.3
Create a file
# import pymysql from dbutils.pooled_db import PooledDB # Some versions are introduced using the following statement, please pay attention to it# from import PooledDB host = '127.0.0.1' port = 3306 user = 'myname' password = 'mypass' database = 'contest' class MySQLConnectionPool: def __init__(self,): = PooledDB( creator=pymysql, # Use modules that link databases mincached=10, # When initializing, at least the link created in the link pool is created, 0 means that no link is created. maxconnections=200, # The maximum number of connections allowed by the connection pool, 0 and None means no limit on the number of connections blocking=True, # Whether to block and wait after no connection is available in the connection pool. True, wait; False, don't wait and then report an error host=host, port=port, user=user, password=password, database=database ) def open(self): = () = (cursor=) # Indicates that the read data is of dictionary type return , def close(self, cursor, conn): () () def select_one(self, sql, *args): """Query a single piece of data""" conn, cursor = () (sql, args) result = () (conn, cursor) return result def select_all(self, sql, args): """Query multiple data""" conn, cursor = () (sql, args) result = () (conn, cursor) return result def insert_one(self, sql, args): """Insert a single piece of data""" (sql, args, isNeed=True) def insert_all(self, sql, datas): """Insert multiple batch inserts""" conn, cursor = () try: (sql, datas) () return {'result': True, 'id': int()} except Exception as err: () return {'result': False, 'err': err} def update_one(self, sql, args): """Update data""" (sql, args, isNeed=True) def delete_one(self, sql, *args): """Delete Data""" (sql, args, isNeed=True) def execute(self, sql, args, isNeed=False): """ implement :param isNeed Does it need to rollback """ conn, cursor = () if isNeed: try: (sql, args) () except: () else: (sql, args) () (conn, cursor)
Create files and introduce them to use
# # Introduce connection pooling classfrom sqlConfig import MySQLConnectionPool # Create a connection pool objectConnPool = MySQLConnectionPool() # Fuzzy querystrSelectAll = "select * from names where name like %s" results = ConnPool.select_all(strSelectAll, ('%Don%',)) print(results) # Accurate query# strSelectAll = "select * from names where name=%s" # results = ConnPool.select_all(strSelectAll, ('Tang San',))# print(results) # Single query# strSelectOne = 'select * from `names` where `name`=%s' # results = ConnPool.select_one(strSelectOne, ('Tang San',))# print(results) # Single insertion# strInsertOne = "insert into `names` (`name`, sex, age) values (%s,%s,%s)" # ConnPool.insert_one(strInsertOne, ('Tang San', 'Male', 22)) # Batch Insert# datas = [ # ('Dai Mubai', 'Male', 26),# ('Oscar', 'Male', 26),# ('Tang San', 'Male', 25),# ('Xiao Wu', 'Female', 100000),# ('Ma Hongjun', 'Male', 23),# ('Ning Rongrong', 'Female', 22),# ('Zhu Zhuqing', 'female', 21),# ] # sql_insert_all = "insert into `names` (`name`, sex, age) values (%s,%s,%s)" # ConnPool.insert_all(sql_insert_all, datas) # sql_update_one = "update `names` set age=%s where `name`=%s" # ConnPool.update_one(sql_update_one, (28, 'Tang San')) # sql_delete_one = 'delete from `names` where `name`=%s ' # ConnPool.delete_one(sql_delete_one, ('Tang San',))
This is the end of this article about the detailed explanation of the operation of connecting to the MySQL database connection pool in Python. For more related content of Python MySQL database connection pool, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!