Client-side connection pool management

Mar 1, 2025

Connection pooling patterns that work fine for small applications with simple resource access and few users can quickly become bottlenecks as your system scales. Even if individual operations remain simple, having many concurrent users accessing shared resources like PostgreSQL databases or RabbitMQ message queues can expose inefficiencies and limitations in your architecture. This post explores strategies for scaling connection pools to handle high concurrency, with a focus on client-side improvements. While we’ll use PostgreSQL and RabbitMQ as our primary examples, these patterns can help you manage any resource that requires connection pooling.

Common Issues

Thread Safety

When multiple threads access the database simultaneously, race conditions and data corruption can occur if connections aren’t properly synchronized. Connection pools must be thread-safe to prevent multiple threads from accessing the same connection simultaneously.

from abc import ABC, abstractmethod
import threading
from contextlib import contextmanager
from functools import wraps
import time

class ThreadSafeResource(ABC):
    _instance = None
    _lock = threading.RLock()

    def __new__(cls, **config):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
                cls._instance._initialized = False
            return cls._instance

    def __init__(self, **config):
        with self._lock:
            if not self._initialized:
                self.pool = self._create_pool(**config)
                self.retry_config = self._configure_retries()
                self._initialized = True

    @abstractmethod
    def _create_pool(self, **config):
        """Create and return the connection pool"""
        pass

    @abstractmethod 
    def _configure_retries(self):
        """Configure and return retry settings"""
        pass

    @abstractmethod
    def _handle_error(self, error):
        """Handle errors during resource usage"""
        pass

    @contextmanager
    def get_resource(self):
        resource = None
        try:
            with self._lock:
                resource = self.pool.get_resource()
            yield resource
        except Exception as e:
            self._handle_error(e)
        finally:
            if resource:
                with self._lock:
                    self.pool.return_resource(resource)

    def execute_with_retry(self, func, *args, **kwargs):
        """Execute a function with retry logic"""
        retries = self.retry_config.get('max_retries', 3)
        delay = self.retry_config.get('initial_delay', 0.1)
        max_delay = self.retry_config.get('max_delay', 10)
        backoff_factor = self.retry_config.get('backoff_factor', 2)

        last_exception = None
        for attempt in range(retries):
            try:
                with self.get_resource() as resource:
                    return func(resource, *args, **kwargs)
            except Exception as e:
                last_exception = e
                if attempt == retries - 1:
                    raise last_exception
                
                wait_time = min(delay * (backoff_factor ** attempt), max_delay)
                time.sleep(wait_time)
                continue

    def __del__(self):
        with self._lock:
            if hasattr(self, 'pool'):
                self.pool.cleanup()

Postgres Connection Pooling

Poor connection management leads to several problems:

PostgreSQL implements a “process per user” client/server model. In this model, every client process connects to exactly one backend process. As we do not know ahead of time how many connections will be made, we have to use a “supervisor process” that spawns a new backend process every time a connection is requested. This supervisor process is called postmaster and listens at a specified TCP/IP port for incoming connections. Whenever it detects a request for a connection, it spawns a new backend process. Those backend processes communicate with each other and with other processes of the instance using semaphores and shared memory to ensure data integrity throughout concurrent data access.

PostgreSQL creates a separate process for each connection rather than using a shared-thread model. This design choice means that there is significant CPU overhead from context switching when managing many concurrent connections. Even though PostgreSQL uses process-based concurrency, it still faces lock contention due to shared resources.

Source of ContentionWhy It HappensMitigation
WAL WritesMultiple processes compete for WAL buffersUse synchronous_commit=off, optimize wal_writer_delay
Shared BuffersProcesses compete for hot pagesIncrease shared_buffers, optimize effective_cache_size
IndexesIndex updates require page locksLower fillfactor, use partitioning, reindex
Table LocksDDL (ALTER, VACUUM FULL) blocks writesAvoid table locks, prefer VACUUM ANALYZE
Row-Level LocksFrequent updates/deletes on the same rowsReduce transaction time, use FOR UPDATE SKIP LOCKED

Here’s an example of a PostgreSQL client with proper connection pooling using the ThreadSafeResource base class:

class PostgresClient(ThreadSafeResource):
    def __init__(self, dsn=None, minconn=1, maxconn=5, **conn_kwargs):
        """
        Initialize the PostgreSQL client with a connection pool.
        Provide either a DSN string or connection parameters (user, password, host, port, database).
        """
        super().__init__()
        self.pool = self._create_pool(dsn=dsn, minconn=minconn, maxconn=maxconn, **conn_kwargs)
        self.retry_config = self._configure_retries()

    def _create_pool(self, **config):
        """Implements connection pool creation."""
        return ThreadedConnectionPool(
            minconn=config.get("minconn", 1),
            maxconn=config.get("maxconn", 5),
            dsn=config.get("dsn"),
            **{k: v for k, v in config.items() if k not in ["minconn", "maxconn", "dsn"]}
        )

    def _configure_retries(self):
        """Configures retry settings for database operations."""
        return {
            "max_retries": 3,
            "initial_delay": 0.1,
            "max_delay": 5,
            "backoff_factor": 2
        }

    def _handle_error(self, error):
        """Handles errors by logging and potentially retrying."""
        print(f"Database error: {error}")

    def execute_query(self, query, params=None):
        """
        Execute a SQL query using a pooled connection.
        Returns fetched results for SELECT queries, or the number of affected rows for INSERT/UPDATE.
        Implements safe acquisition/release and basic retry logic.
        """
        retries = self.retry_config["max_retries"]
        delay = self.retry_config["initial_delay"]
        max_delay = self.retry_config["max_delay"]
        backoff_factor = self.retry_config["backoff_factor"]

        result = None
        conn = None
        for attempt in range(retries):
            try:
                with self._lock:
                    conn = self.pool.getconn()
            except PoolError:
                if attempt == retries - 1:
                    raise
                time.sleep(min(delay * (backoff_factor ** attempt), max_delay))
                continue

            try:
                with conn.cursor() as cur:
                    cur.execute(query, params)
                    if cur.description:
                        result = cur.fetchall()
                    else:
                        result = cur.rowcount
                conn.commit()
                break
            except psycopg2.Error as db_err:
                conn.rollback()
                self._handle_error(db_err)
                with self._lock:
                    self.pool.putconn(conn, close=True)
                conn = None
                if attempt == retries - 1:
                    raise
                time.sleep(min(delay * (backoff_factor ** attempt), max_delay))
                continue
            finally:
                if conn:
                    with self._lock:
                        self.pool.putconn(conn)

        return result

    def close(self):
        """Close all connections in the pool (to be called during application shutdown)."""
        with self._lock:
            self.pool.closeall()

If multiple processes create an instance of PostgresClient, each process will have its own independent connection pool. However, PostgreSQL connections are not shared across processes, so you need to be mindful of potential connection exhaustion and proper cleanup, without relying on PgBouncer or other external connection pools. Since ThreadedConnectionPool cannot be shared across processes, you can use multiprocessing.Manager to create a single pool and share it across processes.

from multiprocessing import Manager
from multiprocessing.managers import BaseManager

class PostgresPoolManager(BaseManager):
    pass

PostgresPoolManager.register('ThreadedConnectionPool', ThreadedConnectionPool)

class PostgresClient(ThreadSafeResource):
    _pool_manager = None
    _shared_pool = None

    def __init__(self, dsn=None, minconn=1, maxconn=5, **conn_kwargs):
        """
        Initialize the PostgreSQL client with a shared connection pool.
        Provide either a DSN string or connection parameters (user, password, host, port, database).
        """
        super().__init__()
        if PostgresClient._pool_manager is None:
            PostgresClient._pool_manager = PostgresPoolManager()
            PostgresClient._pool_manager.start()
            
        if PostgresClient._shared_pool is None:
            with self._lock:
                if PostgresClient._shared_pool is None:
                    PostgresClient._shared_pool = self._create_pool(
                        dsn=dsn, minconn=minconn, maxconn=maxconn, **conn_kwargs
                    )
        self.pool = PostgresClient._shared_pool
        self.retry_config = self._configure_retries()

    def _create_pool(self, **config):
        """Creates a shared connection pool using multiprocessing.Manager"""
        return self._pool_manager.ThreadedConnectionPool(
            minconn=config.get("minconn", 1),
            maxconn=config.get("maxconn", 5),
            dsn=config.get("dsn"),
            **{k: v for k, v in config.items() if k not in ["minconn", "maxconn", "dsn"]}
        )

    def _configure_retries(self):
        """Configures retry settings for database operations."""
        return {
            "max_retries": 3,
            "initial_delay": 0.1,
            "max_delay": 5,
            "backoff_factor": 2
        }

    def _handle_error(self, error):
        """Handles errors by logging and potentially retrying."""
        print(f"Database error: {error}")

    def execute_query(self, query, params=None):
        """
        Execute a SQL query using a pooled connection.
        Returns fetched results for SELECT queries, or the number of affected rows for INSERT/UPDATE.
        Implements safe acquisition/release and basic retry logic.
        """
        retries = self.retry_config["max_retries"]
        delay = self.retry_config["initial_delay"]
        max_delay = self.retry_config["max_delay"]
        backoff_factor = self.retry_config["backoff_factor"]

        result = None
        conn = None
        for attempt in range(retries):
            try:
                with self._lock:
                    conn = self.pool.getconn()
            except PoolError:
                if attempt == retries - 1:
                    raise
                time.sleep(min(delay * (backoff_factor ** attempt), max_delay))
                continue

            try:
                with conn.cursor() as cur:
                    cur.execute(query, params)
                    if cur.description:
                        result = cur.fetchall()
                    else:
                        result = cur.rowcount
                conn.commit()
                break
            except psycopg2.Error as db_err:
                conn.rollback()
                self._handle_error(db_err)
                with self._lock:
                    self.pool.putconn(conn, close=True)
                conn = None
                if attempt == retries - 1:
                    raise
                time.sleep(min(delay * (backoff_factor ** attempt), max_delay))
                continue
            finally:
                if conn:
                    with self._lock:
                        self.pool.putconn(conn)

        return result

    def close(self):
        """Close all connections in the pool (to be called during application shutdown)."""
        with self._lock:
            self.pool.closeall()
            if PostgresClient._pool_manager:
                PostgresClient._pool_manager.shutdown()
                PostgresClient._pool_manager = None
                PostgresClient._shared_pool = None
# Example usage with multiple processes:
from multiprocessing import Process, Queue
import random

def worker_process(worker_id, queue):
    # Initialize the client - each process will share the same underlying pool
    db = PostgresClient(
        host="localhost",
        port=5432,
        database="mydb",
        user="myuser",
        password="mypassword",
        minconn=2,
        maxconn=10
    )
    
    try:
        # Simulate some database operations
        for _ in range(3):
            # Insert a record
            user_id = db.execute_query(
                "INSERT INTO users (name, age) VALUES (%s, %s) RETURNING id",
                params=(f"User {worker_id}-{random.randint(1,100)}", random.randint(20,60))
            )
            
            # Query the inserted record
            result = db.execute_query(
                "SELECT * FROM users WHERE id = %s",
                params=(user_id[0][0],)
            )
            
            queue.put(f"Process {worker_id} inserted and queried: {result}")
            
    finally:
        db.close()

def main():
    num_processes = 3
    result_queue = Queue()
    processes = []
    
    # Start multiple processes
    for i in range(num_processes):
        p = Process(target=worker_process, args=(i, result_queue))
        processes.append(p)
        p.start()
    
    # Wait for all processes to complete
    for p in processes:
        p.join()
    
    # Print results
    while not result_queue.empty():
        print(result_queue.get())

if __name__ == "__main__":
    main()

RabbitMQ Connection Pooling

RabbitMQ is a message broker that uses the AMQP protocol to facilitate communication between producers and consumers. It is designed to handle large volumes of messages efficiently. However, managing connections to RabbitMQ can be challenging, especially in high-concurrency scenarios. Connection pooling can help alleviate some of these challenges by reusing connections and channels, reducing the overhead of establishing new connections for each operation.

In RabbitMQ, connections and channels are distinct concepts related to client interaction with the broker:

Connections:

  • A connection is a TCP connection between your application and the RabbitMQ broker
  • Each connection consumes system resources (sockets, memory, CPU)
  • Establishing a connection is expensive and should be done sparingly
  • Connections persist unless explicitly closed or if there’s a network failure

Channels:

  • A channel is a virtual connection within a single TCP connection
  • Channels allow multiple concurrent interactions (publishing, consuming) without opening new connections
  • They are lightweight and consume fewer resources than multiple TCP connections
  • A single connection can manage multiple channels for better efficiency
from abc import ABC, abstractmethod
import threading
import time
import pika
from contextlib import contextmanager

class ThreadSafeRabbitMQClient(ThreadSafeResource):
    def _create_pool(self, **config):
        """Creates a connection pool with thread-safe management."""
        return RabbitMQConnectionPool(**config)

    def _configure_retries(self):
        """Defines retry settings for failed RabbitMQ operations."""
        return {
            "max_retries": 5,
            "initial_delay": 0.2,
            "max_delay": 5,
            "backoff_factor": 2
        }

    def _handle_error(self, error):
        """Handles errors in RabbitMQ operations."""
        print(f"Error in RabbitMQ operation: {error}")

class RabbitMQConnectionPool:
    """Manages RabbitMQ connections and channels in a thread-safe manner."""
    
    def __init__(
        self, 
        host='localhost', 
        port=5672, 
        max_connections=10
    ):
        self.host = host
        self.port = port
        self.max_connections = max_connections
        self.lock = threading.RLock()
        self.connections = []
        self._initialize_connections()

    def _initialize_connections(self):
        """Pre-creates a pool of RabbitMQ connections."""
        for _ in range(self.max_connections):
            conn = self._create_connection()
            if conn:
                self.connections.append(conn)

    def _create_connection(self):
        """Creates a new RabbitMQ connection."""
        try:
            params = pika.ConnectionParameters(
                host=self.host, 
                port=self.port
            )
            return pika.BlockingConnection(params)
        except Exception as e:
            print(f"Failed to create RabbitMQ connection: {e}")
            return None

    def get_resource(self):
        """Retrieves a connection and channel in a thread-safe manner."""
        with self.lock:
            if not self.connections:
                self._initialize_connections()
            if self.connections:
                conn = self.connections.pop()
                channel = conn.channel()
                return conn, channel
            else:
                raise Exception("No available RabbitMQ connections")

    def return_resource(self, resource):
        """Returns a connection to the pool."""
        conn, channel = resource
        with self.lock:
            if conn and conn.is_open:
                channel.close()
                self.connections.append(conn)
            else:
                self.connections.append(self._create_connection())

    def cleanup(self):
        """Closes all RabbitMQ connections on cleanup."""
        with self.lock:
            for conn in self.connections:
                if conn and conn.is_open:
                    conn.close()
            self.connections.clear()

# Usage Example:
def publish_message(resource, queue, message):
    """Publishes a message to RabbitMQ."""
    _, channel = resource
    channel.queue_declare(queue=queue)
    channel.basic_publish(
        exchange='',
        routing_key=queue,
        body=message
    )
    print(f"Published message: {message}")

if __name__ == "__main__":
    rabbitmq_client = ThreadSafeRabbitMQClient(
        host='localhost',
        port=5672,
        max_connections=5
    )

    # Publish messages with retries
    rabbitmq_client.execute_with_retry(
        publish_message,
        "test_queue",
        "Hello, RabbitMQ!"
    )

← Back to all posts