Client-side connection pool management
Mar 1, 2025
Connection pooling patterns that work fine for small applications with simple resource access and few users can quickly become bottlenecks as your system scales. Even if individual operations remain simple, having many concurrent users accessing shared resources like PostgreSQL databases or RabbitMQ message queues can expose inefficiencies and limitations in your architecture. This post explores strategies for scaling connection pools to handle high concurrency, with a focus on client-side improvements. While we’ll use PostgreSQL and RabbitMQ as our primary examples, these patterns can help you manage any resource that requires connection pooling.
Common Issues
Thread Safety
When multiple threads access the database simultaneously, race conditions and data corruption can occur if connections aren’t properly synchronized. Connection pools must be thread-safe to prevent multiple threads from accessing the same connection simultaneously.
from abc import ABC, abstractmethod
import threading
from contextlib import contextmanager
from functools import wraps
import time
class ThreadSafeResource(ABC):
_instance = None
_lock = threading.RLock()
def __new__(cls, **config):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, **config):
with self._lock:
if not self._initialized:
self.pool = self._create_pool(**config)
self.retry_config = self._configure_retries()
self._initialized = True
@abstractmethod
def _create_pool(self, **config):
"""Create and return the connection pool"""
pass
@abstractmethod
def _configure_retries(self):
"""Configure and return retry settings"""
pass
@abstractmethod
def _handle_error(self, error):
"""Handle errors during resource usage"""
pass
@contextmanager
def get_resource(self):
resource = None
try:
with self._lock:
resource = self.pool.get_resource()
yield resource
except Exception as e:
self._handle_error(e)
finally:
if resource:
with self._lock:
self.pool.return_resource(resource)
def execute_with_retry(self, func, *args, **kwargs):
"""Execute a function with retry logic"""
retries = self.retry_config.get('max_retries', 3)
delay = self.retry_config.get('initial_delay', 0.1)
max_delay = self.retry_config.get('max_delay', 10)
backoff_factor = self.retry_config.get('backoff_factor', 2)
last_exception = None
for attempt in range(retries):
try:
with self.get_resource() as resource:
return func(resource, *args, **kwargs)
except Exception as e:
last_exception = e
if attempt == retries - 1:
raise last_exception
wait_time = min(delay * (backoff_factor ** attempt), max_delay)
time.sleep(wait_time)
continue
def __del__(self):
with self._lock:
if hasattr(self, 'pool'):
self.pool.cleanup()
Postgres Connection Pooling
Poor connection management leads to several problems:
- Connection leaks: Failing to properly close connections
- Connection thrashing: Repeatedly opening/closing connections
- Connection starvation: Running out of available connections
- Idle connections: Keeping unused connections open
PostgreSQL implements a “process per user” client/server model. In this model, every client process connects to exactly one backend process. As we do not know ahead of time how many connections will be made, we have to use a “supervisor process” that spawns a new backend process every time a connection is requested. This supervisor process is called postmaster and listens at a specified TCP/IP port for incoming connections. Whenever it detects a request for a connection, it spawns a new backend process. Those backend processes communicate with each other and with other processes of the instance using semaphores and shared memory to ensure data integrity throughout concurrent data access.
PostgreSQL creates a separate process for each connection rather than using a shared-thread model. This design choice means that there is significant CPU overhead from context switching when managing many concurrent connections. Even though PostgreSQL uses process-based concurrency, it still faces lock contention due to shared resources.
Source of Contention | Why It Happens | Mitigation |
---|---|---|
WAL Writes | Multiple processes compete for WAL buffers | Use synchronous_commit=off, optimize wal_writer_delay |
Shared Buffers | Processes compete for hot pages | Increase shared_buffers , optimize effective_cache_size |
Indexes | Index updates require page locks | Lower fillfactor , use partitioning, reindex |
Table Locks | DDL (ALTER , VACUUM FULL ) blocks writes | Avoid table locks, prefer VACUUM ANALYZE |
Row-Level Locks | Frequent updates/deletes on the same rows | Reduce transaction time, use FOR UPDATE SKIP LOCKED |
Here’s an example of a PostgreSQL client with proper connection pooling using the ThreadSafeResource base class:
- Without pooling: ~300–500 connections before performance drops. Tune
max_connections
,work_mem
, and DB settings. - With pooling: Can handle 10,000+ clients effectively. Use connection pooling, e.g., PgBouncer or Odyssey.
class PostgresClient(ThreadSafeResource):
def __init__(self, dsn=None, minconn=1, maxconn=5, **conn_kwargs):
"""
Initialize the PostgreSQL client with a connection pool.
Provide either a DSN string or connection parameters (user, password, host, port, database).
"""
super().__init__()
self.pool = self._create_pool(dsn=dsn, minconn=minconn, maxconn=maxconn, **conn_kwargs)
self.retry_config = self._configure_retries()
def _create_pool(self, **config):
"""Implements connection pool creation."""
return ThreadedConnectionPool(
minconn=config.get("minconn", 1),
maxconn=config.get("maxconn", 5),
dsn=config.get("dsn"),
**{k: v for k, v in config.items() if k not in ["minconn", "maxconn", "dsn"]}
)
def _configure_retries(self):
"""Configures retry settings for database operations."""
return {
"max_retries": 3,
"initial_delay": 0.1,
"max_delay": 5,
"backoff_factor": 2
}
def _handle_error(self, error):
"""Handles errors by logging and potentially retrying."""
print(f"Database error: {error}")
def execute_query(self, query, params=None):
"""
Execute a SQL query using a pooled connection.
Returns fetched results for SELECT queries, or the number of affected rows for INSERT/UPDATE.
Implements safe acquisition/release and basic retry logic.
"""
retries = self.retry_config["max_retries"]
delay = self.retry_config["initial_delay"]
max_delay = self.retry_config["max_delay"]
backoff_factor = self.retry_config["backoff_factor"]
result = None
conn = None
for attempt in range(retries):
try:
with self._lock:
conn = self.pool.getconn()
except PoolError:
if attempt == retries - 1:
raise
time.sleep(min(delay * (backoff_factor ** attempt), max_delay))
continue
try:
with conn.cursor() as cur:
cur.execute(query, params)
if cur.description:
result = cur.fetchall()
else:
result = cur.rowcount
conn.commit()
break
except psycopg2.Error as db_err:
conn.rollback()
self._handle_error(db_err)
with self._lock:
self.pool.putconn(conn, close=True)
conn = None
if attempt == retries - 1:
raise
time.sleep(min(delay * (backoff_factor ** attempt), max_delay))
continue
finally:
if conn:
with self._lock:
self.pool.putconn(conn)
return result
def close(self):
"""Close all connections in the pool (to be called during application shutdown)."""
with self._lock:
self.pool.closeall()
If multiple processes create an instance of PostgresClient, each process will have its own independent connection pool. However, PostgreSQL connections are not shared across processes, so you need to be mindful of potential connection exhaustion and proper cleanup, without relying on PgBouncer or other external connection pools. Since ThreadedConnectionPool
cannot be shared across processes, you can use multiprocessing.Manager
to create a single pool and share it across processes.
from multiprocessing import Manager
from multiprocessing.managers import BaseManager
class PostgresPoolManager(BaseManager):
pass
PostgresPoolManager.register('ThreadedConnectionPool', ThreadedConnectionPool)
class PostgresClient(ThreadSafeResource):
_pool_manager = None
_shared_pool = None
def __init__(self, dsn=None, minconn=1, maxconn=5, **conn_kwargs):
"""
Initialize the PostgreSQL client with a shared connection pool.
Provide either a DSN string or connection parameters (user, password, host, port, database).
"""
super().__init__()
if PostgresClient._pool_manager is None:
PostgresClient._pool_manager = PostgresPoolManager()
PostgresClient._pool_manager.start()
if PostgresClient._shared_pool is None:
with self._lock:
if PostgresClient._shared_pool is None:
PostgresClient._shared_pool = self._create_pool(
dsn=dsn, minconn=minconn, maxconn=maxconn, **conn_kwargs
)
self.pool = PostgresClient._shared_pool
self.retry_config = self._configure_retries()
def _create_pool(self, **config):
"""Creates a shared connection pool using multiprocessing.Manager"""
return self._pool_manager.ThreadedConnectionPool(
minconn=config.get("minconn", 1),
maxconn=config.get("maxconn", 5),
dsn=config.get("dsn"),
**{k: v for k, v in config.items() if k not in ["minconn", "maxconn", "dsn"]}
)
def _configure_retries(self):
"""Configures retry settings for database operations."""
return {
"max_retries": 3,
"initial_delay": 0.1,
"max_delay": 5,
"backoff_factor": 2
}
def _handle_error(self, error):
"""Handles errors by logging and potentially retrying."""
print(f"Database error: {error}")
def execute_query(self, query, params=None):
"""
Execute a SQL query using a pooled connection.
Returns fetched results for SELECT queries, or the number of affected rows for INSERT/UPDATE.
Implements safe acquisition/release and basic retry logic.
"""
retries = self.retry_config["max_retries"]
delay = self.retry_config["initial_delay"]
max_delay = self.retry_config["max_delay"]
backoff_factor = self.retry_config["backoff_factor"]
result = None
conn = None
for attempt in range(retries):
try:
with self._lock:
conn = self.pool.getconn()
except PoolError:
if attempt == retries - 1:
raise
time.sleep(min(delay * (backoff_factor ** attempt), max_delay))
continue
try:
with conn.cursor() as cur:
cur.execute(query, params)
if cur.description:
result = cur.fetchall()
else:
result = cur.rowcount
conn.commit()
break
except psycopg2.Error as db_err:
conn.rollback()
self._handle_error(db_err)
with self._lock:
self.pool.putconn(conn, close=True)
conn = None
if attempt == retries - 1:
raise
time.sleep(min(delay * (backoff_factor ** attempt), max_delay))
continue
finally:
if conn:
with self._lock:
self.pool.putconn(conn)
return result
def close(self):
"""Close all connections in the pool (to be called during application shutdown)."""
with self._lock:
self.pool.closeall()
if PostgresClient._pool_manager:
PostgresClient._pool_manager.shutdown()
PostgresClient._pool_manager = None
PostgresClient._shared_pool = None
# Example usage with multiple processes:
from multiprocessing import Process, Queue
import random
def worker_process(worker_id, queue):
# Initialize the client - each process will share the same underlying pool
db = PostgresClient(
host="localhost",
port=5432,
database="mydb",
user="myuser",
password="mypassword",
minconn=2,
maxconn=10
)
try:
# Simulate some database operations
for _ in range(3):
# Insert a record
user_id = db.execute_query(
"INSERT INTO users (name, age) VALUES (%s, %s) RETURNING id",
params=(f"User {worker_id}-{random.randint(1,100)}", random.randint(20,60))
)
# Query the inserted record
result = db.execute_query(
"SELECT * FROM users WHERE id = %s",
params=(user_id[0][0],)
)
queue.put(f"Process {worker_id} inserted and queried: {result}")
finally:
db.close()
def main():
num_processes = 3
result_queue = Queue()
processes = []
# Start multiple processes
for i in range(num_processes):
p = Process(target=worker_process, args=(i, result_queue))
processes.append(p)
p.start()
# Wait for all processes to complete
for p in processes:
p.join()
# Print results
while not result_queue.empty():
print(result_queue.get())
if __name__ == "__main__":
main()
RabbitMQ Connection Pooling
RabbitMQ is a message broker that uses the AMQP protocol to facilitate communication between producers and consumers. It is designed to handle large volumes of messages efficiently. However, managing connections to RabbitMQ can be challenging, especially in high-concurrency scenarios. Connection pooling can help alleviate some of these challenges by reusing connections and channels, reducing the overhead of establishing new connections for each operation.
In RabbitMQ, connections and channels are distinct concepts related to client interaction with the broker:
Connections:
- A connection is a TCP connection between your application and the RabbitMQ broker
- Each connection consumes system resources (sockets, memory, CPU)
- Establishing a connection is expensive and should be done sparingly
- Connections persist unless explicitly closed or if there’s a network failure
Channels:
- A channel is a virtual connection within a single TCP connection
- Channels allow multiple concurrent interactions (publishing, consuming) without opening new connections
- They are lightweight and consume fewer resources than multiple TCP connections
- A single connection can manage multiple channels for better efficiency
from abc import ABC, abstractmethod
import threading
import time
import pika
from contextlib import contextmanager
class ThreadSafeRabbitMQClient(ThreadSafeResource):
def _create_pool(self, **config):
"""Creates a connection pool with thread-safe management."""
return RabbitMQConnectionPool(**config)
def _configure_retries(self):
"""Defines retry settings for failed RabbitMQ operations."""
return {
"max_retries": 5,
"initial_delay": 0.2,
"max_delay": 5,
"backoff_factor": 2
}
def _handle_error(self, error):
"""Handles errors in RabbitMQ operations."""
print(f"Error in RabbitMQ operation: {error}")
class RabbitMQConnectionPool:
"""Manages RabbitMQ connections and channels in a thread-safe manner."""
def __init__(
self,
host='localhost',
port=5672,
max_connections=10
):
self.host = host
self.port = port
self.max_connections = max_connections
self.lock = threading.RLock()
self.connections = []
self._initialize_connections()
def _initialize_connections(self):
"""Pre-creates a pool of RabbitMQ connections."""
for _ in range(self.max_connections):
conn = self._create_connection()
if conn:
self.connections.append(conn)
def _create_connection(self):
"""Creates a new RabbitMQ connection."""
try:
params = pika.ConnectionParameters(
host=self.host,
port=self.port
)
return pika.BlockingConnection(params)
except Exception as e:
print(f"Failed to create RabbitMQ connection: {e}")
return None
def get_resource(self):
"""Retrieves a connection and channel in a thread-safe manner."""
with self.lock:
if not self.connections:
self._initialize_connections()
if self.connections:
conn = self.connections.pop()
channel = conn.channel()
return conn, channel
else:
raise Exception("No available RabbitMQ connections")
def return_resource(self, resource):
"""Returns a connection to the pool."""
conn, channel = resource
with self.lock:
if conn and conn.is_open:
channel.close()
self.connections.append(conn)
else:
self.connections.append(self._create_connection())
def cleanup(self):
"""Closes all RabbitMQ connections on cleanup."""
with self.lock:
for conn in self.connections:
if conn and conn.is_open:
conn.close()
self.connections.clear()
# Usage Example:
def publish_message(resource, queue, message):
"""Publishes a message to RabbitMQ."""
_, channel = resource
channel.queue_declare(queue=queue)
channel.basic_publish(
exchange='',
routing_key=queue,
body=message
)
print(f"Published message: {message}")
if __name__ == "__main__":
rabbitmq_client = ThreadSafeRabbitMQClient(
host='localhost',
port=5672,
max_connections=5
)
# Publish messages with retries
rabbitmq_client.execute_with_retry(
publish_message,
"test_queue",
"Hello, RabbitMQ!"
)