Connection pool implementation
socket_pool.py
# -*- coding:utf-8 -*- import socket import time import threading import os import logging import traceback from queue import Queue, Empty _logger = ('mylogger') class SocketPool: def __init__(self, host, port, min_connections=10, max_connections=10): ''' Initialize Socket connection pool :param host: target host address :param port: target port number :param min_connections: minimum number of connections :param max_connections: Maximum number of connections ''' = host = port self.min_connections = min_connections self.max_connections = max_connections self.busy_sockets_dict = {} # Store the id of the socket taken from the connection pool self._sock_lock = () # Thread lock ensures correct counting self._pool = Queue(max_connections) # Thread-safe queue storage connection self._lock = () # Thread lock ensures resource security: self._init_pool() # Pre-create a connection self._start_health_check() # Start the connection health check thread def _init_pool(self): '''Pre-create the connection and fill it into the pool''' for _ in range(self.min_connections): sock = self._create_socket() self._pool.put(sock) def _create_socket(self): '''Create a new Socket connection''' sock = (socket.AF_INET, socket.SOCK_STREAM) try: ((, )) return sock except as e: raise ConnectionError(f'Failed to connect: {e}') # Connection failed throws exception def _start_health_check(self): '''Start the background thread regularly checks the connection validity''' def check(): while True: with self._lock: for _ in range(self._pool.qsize()): sock = self._pool.get() self.busy_sockets_dict[sock] = 1 try: (b'PING<END>') # Send heartbeat packet to verify connection status # The following 11 is the server to return the data byte length, and cannot be written randomly, otherwise it will cause unnecessary content to obtain non-healthy check response packet data and do not conform to the format, resulting in data parsing problems (11) self._pool.put(sock) self.busy_sockets_dict.pop(sock) except (, ConnectionResetError): _logger.error('Socket connection health check error: %s, close failed connection and create new connection replacement' % traceback.format_exc()) () # Close the failed connection and create a new connection replacement self.busy_sockets_dict.pop(sock) new_sock = self._create_socket() self._pool.put(new_sock) # If the number of socks is less than the minimum number, add for _ in range(0, self.min_connections - self._pool.qsize()): new_sock = self._create_socket() self._pool.put(new_sock) (60) # Check every 60 seconds (target=check, daemon=True).start() def get_connection(self): ''' Get an available connection from the pool :return: socket object ''' with self._sock_lock: if self._pool.empty(): if len(self.busy_sockets_dict.keys()) < self.max_connections: new_sock = self._create_socket() self.busy_sockets_dict[new_sock] = 1 return new_sock else: raise Empty('No available connections in pool') else: try: sock = self._pool.get(block=False) self.busy_sockets_dict[sock] = 1 return sock except Exception: _logger.error('Error getting socket connection: %s' % traceback.format_exc()) raise def release_connection(self, sock): ''' Return the connection to the pool :param sock: socket object to be returned ''' if not sock._closed: self._pool.put(sock) if sock in self.busy_sockets_dict: self.busy_sockets_dict.pop(sock) def close_all(self): '''Close all connections in the pool''' while not self._pool.empty(): sock = self._pool.get() () self.busy_sockets_dict.pop() self.busy_sockets_dict = {} # Guaranteehost = ('MODBUS_TCP_SERVER_HOST', '127.0.0.1') port = int(('MODBUS_TCP_SERVER_PORT', '9000')) min_connections = int(('DJANGO_SOCKET_POOL_MAX_CONNECTIONS', '10')) max_connections = int(('DJANGO_SOCKET_POOL_MAX_CONNECTIONS', '100')) socketPool = SocketPool(host, port, min_connections, max_connections)
Using connection pool
from socket_pool import socketPool def send_socket_msg(data): global socketPool try: sock = None # Get the connection (supports timeout control) sock = socketPool.get_connection() # Send data (('utf-8')) except Exception: error_msg = 'Error sending message: %s' % traceback.format_exc() _logger.error(error_msg) if sock is not None: () socketPool.release_connection(sock) return send_socket_msg(data) response = '' try: while True: chunk = (4096) chunk = ('utf-8') response += chunk if ('<END>'): response = ('<END>') return {'success':True, 'message':response} except Exception: error_msg = 'Error getting message: %s' % traceback.format_exc() _logger.error(error_msg) return {'success':False, 'message': error_msg} finally: # The connection must be returned! socketPool.release_connection(sock)
This is the end of this article about Python's implementation of tcp socket connection pool based on queues. For more related content of Python tcp socket connection pool, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!