Cooperative concurrency with FastAPI: it's the generators, stupid

Mar 27, 2024

Asyncio, wrote by Guido himself, is known to be complex: experienced programmers (1, 2) struggle with it. Reasonably so, half of the language is about async. Making your program concurrent is a big part of the job writing production-ready code, especially in the context of microservices. Doing async well with an ASGI framework (say fastapi) adds another layer of complexity. This article provides a practical guide. Our goal is to write responsive async endpoints on fastapi – getting there takes a bit of effort.

A word of caution. Just because you can doesn’t mean you should. A blocking function is better than a wrong function. Some code is better off blocking to be executed in a more deterministic fashion. This warrants a separate discussion.

Database Code Handles Concurrency through ACID, Not In-Process Synchronization

Whether or not we’ve managed to use threaded code or coroutines with implicit or explicit IO and find all the race conditions that would occur in our process, that matters not at all if the thing we’re talking to is a relational database, especially in today’s world where everything runs in clustered / horizontal / distributed ways - the handwringing of academic theorists regarding the non-deterministic nature of threads is just the tip of the iceberg; we need to deal with entirely distinct processes, and regardless of what’s said, non-determinism is here to stay.

For database code, you have exactly one technique to use in order to assure correct concurrency, and that is by using ACID-oriented constructs and techniques. These unfortunately don’t come magically or via any known silver bullet, though there are great tools that are designed to help steer you in the right direction.Mike Bayer, Asynchronous Python and Databases

Preamble: why async?

Raymond Hettinger on concurrency

Must be this tall
“Must be this tall to write multi-threaded code.” Sign installed by David Baron at Mozilla. Reported by Bobby Holley.
Threads have shared states and you need to manage racing conditions. Processes are independent and communicating between them could be expensive due to object pickling. Threads switch preemptively by the thread manager – critical sections need to be guarded with locks to avoid incoherent states. (As the GIL example below shows, threading is not easy.) Async switches cooperatively. The downside is you need to learn a non-blocking version of everything you do.

Coros

Coroutines are subroutines that can be suspended and resumed; subroutines are coroutines that run from start to end. Coroutines are concurrently executed (i.e., async) and subroutines are blocking (i.e., sync). Coroutines can be implemented with generators (@asyncio.coroutine and types.coroutine, see yield from syntax introduced in PEP 380). PEP 492 introduces the async keyword to define a native coroutine (which is still just a generator):

  • async def functions are always coroutines, even if they do not contain await expressions.
  • It is a SyntaxError to have yield or yield from expressions in an async function.
  • Internally, two new code object flags were introduced: CO_COROUTINE is used to mark native coroutines (defined with new syntax). CO_ITERABLE_COROUTINE is used to make generator-based coroutines compatible with native coroutines (set by types.coroutine() function).
  • Regular generators, when called, return a generator object; similarly, coroutines return a coroutine object.
  • StopIteration exceptions are not propagated out of coroutines, and are replaced with a RuntimeError. For regular generators such behavior requires a future import (see PEP 479).

The word “coroutine”, like the word “generator”, is used for two different (though related) concepts – identifying a map by its image:

  • The function that defines a coroutine (a function definition decorated with asyncio.coroutine). If disambiguation is needed we will call this a coroutine function.
  • The object obtained by calling a coroutine function. This object represents a computation or an I/O operation (usually a combination) that will complete eventually. If disambiguation is needed we will call it a coroutine object.

await suspends execution of a coroutine until an awaitable completes and returns the result. An awaitable can be one of the following:

  • A native coroutine object returned from a native coroutine function.
  • A generator-based coroutine object returned from a function decorated with types.coroutine().
  • An object with an __await__ method returning an iterator.
  • Any yield from chain of calls ends with a yield. This is a fundamental mechanism of how Futures are implemented. Since, internally, coroutines are a special kind of generators, every await is suspended by a yield somewhere down the chain of await calls (please refer to PEP 3156 for a detailed explanation). To enable this behavior for coroutines, a new magic method called __await__ is added. In asyncio, for instance, to enable Future objects in await statements, the only change is to add __await__ == __iter__ line to asyncio.Future class. Objects with __await__ method are called Future-like objects in the rest of this PEP.

GIL (Global Interpreter Lock)

GIL prevents parallel execution of bytecode, essentially pinning bytecode to one active thread. Global in python means per process. Python threads themselves are just normal OS-level pthreads, as opposed to green threads: host OS is responsible for scheduling them. GIL for example does not prevent these operations from running concurrently across multiple cores:

Threading comes with overheads. But it’s not as bad as what most people think:

Actually, your virtual stack size is 8388608 bytes (8 MB). Of course, it’s natural to conclude that this can’t be right, because that’s a ridiculously large amount of memory for every thread to consume for its stack when 99% of the time a couple of KB is probably all they need. The good news is that your thread only uses the amount of physical memory that it actually needs. This is one of the magical powers that your OS gets from using the hardware Memory Management Unit (MMU) in your processor. Here’s what happens:

  1. The OS allocates 8 MB of virtual memory for your stack by setting up the MMU’s page tables for your thread. This requires very little RAM to hold the page table entries only.
  2. When your thread runs and tries to access a virtual address on the stack that doesn’t have a physical page assigned to it yet, a hardware exception called a “page fault” is triggered by the MMU.
  3. The CPU core responds to the page fault exception by switching to a privileged execution mode (which has its own stack) and calling the page fault exception handler function inside the kernel.
  4. The kernel allocates a page of physical RAM to that virtual memory page and returns back to the user space thread.
  5. The user space thread sees none of that work. From its point of view, it just uses the stack as if the memory was there all along. Meanwhile, the stack automatically grows (or doesn’t) to meet the thread’s needs.

The MMU is a key part of the hardware of today’s computer systems. In particular, it’s responsible for a lot of the “magic” in the system, so I highly recommend learning more about what the MMU does, and about virtual memory in general. Also, if your application is performance sensitive and deals with a significant amount of data, you should understand how the TLB (the MMU’s page table cache) works and how you can restructure your data or your algorithms to maximize your TLB hit rate.

Python 3.2 introduced a new GIL. The “check interval” based on “ticks” is replaced by an absolute duration expressed in seconds. Some background on mutex (mutual exclusion) lock. A mutex can be released only by the thread that had acquired it. A binary semaphore can be signaled by any thread.

sys.setswitchinterval(interval:float = 0.005)

A thread needs to wait for wall time switchinterval before sending a gil_drop_request. “A primitive lock is in one of two states, “locked” or “unlocked”. It is created in the unlocked state. It has two basic methods, acquire() and release(). When the state is unlocked, acquire() changes the state to locked and returns immediately. When the state is locked, acquire() blocks until a call to release() in another thread changes it to unlocked, then the acquire() call resets it to locked and returns. The release() method should only be called in the locked state; it changes the state to unlocked and returns immediately. If an attempt is made to release an unlocked lock, a RuntimeError will be raised.”
“A condition variable is always associated with some kind of lock; this can be passed in or one will be created by default. Passing one in is useful when several condition variables must share the same lock. The lock is part of the condition object: you don’t have to track it separately.
A condition variable obeys the context management protocol: using the with statement acquires the associated lock for the duration of the enclosed block. The acquire() and release() methods also call the corresponding methods of the associated lock.
Other methods must be called with the associated lock held. The wait() method releases the lock, and then blocks until another thread awakens it by calling notify() or notify_all(). Once awakened, wait() re-acquires the lock and returns. It is also possible to specify a timeout. The notify() method wakes up one of the threads waiting for the condition variable, if any are waiting. The notify_all() method wakes up all threads waiting for the condition variable.
Note: the notify() and notify_all() methods don’t release the lock; this means that the thread or threads awakened will not return from their wait() call immediately, but only when the thread that called notify() or notify_all() finally relinquishes ownership of the lock.
The typical programming style using condition variables uses the lock to synchronize access to some shared state; threads that are interested in a particular change of state call wait() repeatedly until they see the desired state, while threads that modify the state call notify() or notify_all() when they change the state in such a way that it could possibly be a desired state for one of the waiters.”
Wait until notified or until a timeout occurs. If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised.
This method releases the underlying lock, and then blocks until it is awakened by a notify() or notify_all() call for the same condition variable in another thread, or until the optional timeout occurs. Once awakened or timed out, it re-acquires the lock and returns.
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof).
When the underlying lock is an RLock, it is not released using its release() method, since this may not actually unlock the lock when it was acquired multiple times recursively. Instead, an internal interface of the RLock class is used, which really unlocks it even when it has been recursively acquired several times. Another internal interface is then used to restore the recursion level when the lock is reacquired.
The return value is True unless a given timeout expired, in which case it is False.”

# Modified from https://rushter.com/blog/python-gil-thread-scheduling/
import threading
from types import SimpleNamespace

DEFAULT_INTERVAL = 0.05

gil_mutex = threading.RLock()
gil_cond = threading.Condition(lock=gil_mutex)
switch_cond = threading.Condition()

gil = SimpleNamespace(
    drop_request=False,
    locked=False,
    switch_number=0,
    last_holder=None,
    eval_breaker=True
)

# When Python creates a thread it calls take_gil before the thread enters the loop
def take_gil(thread_id):
    gil_cond.acquire()

    while gil.locked:
        saved_switchnum = gil.switch_number

        timed_out = not gil_cond.wait(timeout=DEFAULT_INTERVAL)

        if timed_out and gil.locked and gil.switch_number == saved_switchnum:
            gil.drop_request = True

    switch_cond.acquire()
    gil.locked = True

    if gil.last_holder != thread_id:
        gil.last_holder = thread_id
        gil.switch_number += 1

    switch_cond.notify()
    switch_cond.release()

    if gil.drop_request:
        gil.drop_request = False

    gil_cond.release()


def drop_gil(thread_id):
    if not gil.locked:
        raise Exception("GIL is not locked")

    gil_cond.acquire()

    gil.last_holder = thread_id
    gil.locked = False

    gil_cond.notify()
    gil_cond.release()
    if gil.drop_request:
        switch_cond.acquire()
        if gil.last_holder == thread_id:
            gil.drop_request = False
            switch_cond.wait()

        switch_cond.release()


# All threads run in the same execution_loop
def execution_loop(target_function, thread_id):
    bytecode = compile(target_function)

    while True:
        if gil.drop_request:
            drop_gil(thread_id)
            take_gil(thread_id)

        instruction = bytecode.next_instruction()
        if instruction is not None:
            execute_opcode(instruction)
        else:
            return
'''
https://github.com/python/cpython/blob/94c97423a9c4969f8ddd4a3aa4aacb99c4d5263d/Python/ceval_gil.c#L11

   Notes about the implementation:

   - The GIL is just a boolean variable (locked) whose access is protected
     by a mutex (gil_mutex), and whose changes are signalled by a condition
     variable (gil_cond). gil_mutex is taken for short periods of time,
     and therefore mostly uncontended.

   - In the GIL-holding thread, the main loop (PyEval_EvalFrameEx) must be
     able to release the GIL on demand by another thread. A volatile boolean
     variable (gil_drop_request) is used for that purpose, which is checked
     at every turn of the eval loop. That variable is set after a wait of
     `interval` microseconds on `gil_cond` has timed out.

      [Actually, another volatile boolean variable (eval_breaker) is used
       which ORs several conditions into one. Volatile booleans are
       sufficient as inter-thread signalling means since Python is run
       on cache-coherent architectures only.]

   - A thread wanting to take the GIL will first let pass a given amount of
     time (`interval` microseconds) before setting gil_drop_request. This
     encourages a defined switching period, but doesn't enforce it since
     opcodes can take an arbitrary time to execute.

     The `interval` value is available for the user to read and modify
     using the Python API `sys.{get,set}switchinterval()`.

   - When a thread releases the GIL and gil_drop_request is set, that thread
     ensures that another GIL-awaiting thread gets scheduled.
     It does so by waiting on a condition variable (switch_cond) until
     the value of last_holder is changed to something else than its
     own thread state pointer, indicating that another thread was able to
     take the GIL.

     This is meant to prohibit the latency-adverse behaviour on multi-core
     machines where one thread would speculatively release the GIL, but still
     run and end up being the first to re-acquire it, making the "timeslices"
     much longer than expected.
     (Note: this mechanism is enabled with FORCE_SWITCHING above)
'''

Even the new GIL isn’t perfect. IO-bound operations always release the lock and CPU-bound threads will always try to take back the lock: CPU-bound threads tend to hog the GIL, leaving IO threads stalling. The improvement over the previous implementation is that IO threads now at least periodically hold the lock after deterministic timeout, since the lock holding thread has to drop it. Note the thread that makes the drop request does not always get the lock later: the host OS assigns the thread execution order. See Larry Hastings’s ongoing gilectomy work.

Asyncio

Mostly from PEP 3156. When a coroutine is wrapped into a Task with functions like asyncio.create_task() the coroutine is automatically scheduled to run soon. A Future is a special low-level awaitable object that represents an eventual result of an asynchronous operation. Future object is awaited it means that the coroutine will wait until the Future is resolved in some other place. Future objects in asyncio are needed to allow callback-based code to be used with async/await.With e.g., add_done_callback. If the Future is already done when this method is called, the callback is scheduled with loop.call_soon(). Normally there is no need to create Future objects at the application level code.

asyncio synchronization primitives are designed to be similar to those of the threading module with two important caveats: not thread-safe and no timeout. The concurrent.futures module provides a high-level interface for asynchronously executing callables. asyncio queues are designed to be similar to classes of the queue module. Although asyncio queues are not thread-safe, they are designed to be used specifically in async/await code.

Blocking (CPU-bound) code should not be called directly. For example, if a function performs a CPU-intensive calculation for 1 second, all concurrent asyncio Tasks and IO operations would be delayed by 1 second. An executor can be used to run a task in a different thread or even in a different process to avoid blocking the OS thread with the event loop. See the loop.run_in_executor() method for more details.

When a coroutine function is called, but not awaited (e.g. coro() instead of await coro()) or the coroutine is not scheduled with asyncio.create_task(), asyncio will emit a RuntimeWarning:

# coroutine arguments will be wrapped in Tasks
# returns a tuple of two sets of Futures
done, pending = asyncio.wait(fs, timeout=None, return_when=ALL_COMPLETED)

# returns an iterator whose values are Futures or coros
for f in asyncio.as_completed(fs):
    result = yield from f  # May raise an exception
    # use result
    ...

# wait for a single coro
asyncio.wait_for(f, timeout)

# returns a future which waits until all arguments (Futures or coros) are done
# and return a list of their corresponding results
asyncio.gather(f1, f2, ...)

Twisted

Grandpa of python concurrency libraries.Bite Code!, Asyncio, twisted, tornado, gevent walk into a bar…

Curio / Trio

curio (no longer in active development) grew out of David Beazley’s famous Die Threads presentation. Around the same time, NJS published Notes on Structured Concurrency, or: Go Statement Considered Harmful and gave us trio, taking inspiration from curio. “Trio is not compatible with asyncio, nor gevent or twisted by default. This means it’s also its little own async island.” You can however use AnyIO as the abstraction layer.

AnyIO

It implements trio-like structured concurrency (a good intro on SC) on top of asyncio and works in harmony with the native SC of trio itself. Starlette is based on AnyIO, so you can directly use AnyIO for advanced concurrency use cases. This is my preferred way to write async code.

Task groups

from anyio import sleep, create_task_group, run

async def sometask(num: int) -> None:
    print('Task', num, 'running')
    await sleep(1)
    print('Task', num, 'finished')

async def main() -> None:
    async with create_task_group() as tg:
        for num in range(5):
            tg.start_soon(sometask, num)

    print('All tasks finished!')

run(main)

Multiple errors

#3.11+
from anyio import create_task_group

try:
    async with create_task_group() as tg:
        tg.start_soon(some_task)
        tg.start_soon(another_task)
except* ValueError as excgroup:
    for exc in excgroup.exceptions:
        ...  # handle each ValueError
except* KeyError as excgroup:
    for exc in excgroup.exceptions:
        ...  # handle each KeyError

Timeout and shielding

from anyio import create_task_group, move_on_after, sleep, run

async def main():
    async with create_task_group() as tg:
        with move_on_after(1) as scope:
            print('Starting sleep')
            await sleep(2)
            print('This should never be printed')

        # The cancelled_caught property will be True if timeout was reached
        print('Exited cancel scope, cancelled =', scope.cancelled_caught)

run(main)

async def external_task():
    print('Started sleeping in the external task')
    await sleep(1)
    print('This line should never be seen')


async def main():
    async with create_task_group() as tg:
        with CancelScope(shield=True) as scope:
            tg.start_soon(external_task)
            tg.cancel_scope.cancel()
            print('Started sleeping in the host task')
            await sleep(1)
            print('Finished sleeping in the host task')

run(main)

Finalization

from anyio import CancelScope, create_task_group, sleep, run

async def external_task():
    print('Started sleeping in the external task')
    await sleep(1)
    print('This line should never be seen')

async def main():
    async with create_task_group() as tg:
        with CancelScope(shield=True) as scope:
            tg.start_soon(external_task)
            tg.cancel_scope.cancel()
            print('Started sleeping in the host task')
            await sleep(1)
            print('Finished sleeping in the host task')

run(main)

Running sync in work threads

This is what starlette/FastAPI use to run sync functions concurrently.The default AnyIO worker thread limiter has a value of 40, meaning that any calls to to_thread.run_sync() without an explicit limiter argument will cause a maximum of 40 threads to be spawned. You can adjust this limit like this: to_thread.current_default_thread_limiter().total_tokens = 60. AnyIO’s default thread pool limiter does not affect the default thread pool executor on asyncio.

import time
from anyio import to_thread, run

async def main():
    await to_thread.run_sync(time.sleep, 5)

run(main)

Jupyter

Since jupyter already has a running event loop, code like below will throw RuntimeError: asyncio.run() cannot be called from a running event loop. (Or you can simply await.)

import asyncio

async def main():
    print('hello')
    await asyncio.sleep(1)
    print('world')

asyncio.run(main())

You could add the task to the current loop:

loop = asyncio.get_event_loop()
loop.create_task(main())

Aio LRU

Straightforward extension to coros but this illustrates the parallel worlds between sync functions and async functions.

class Cacheable:
    def __init__(self, co):
        self.co = co
        self.done = False
        self.result = None
        self.lock = asyncio.Lock()

    def __await__(self):
        with (yield from self.lock):
            if self.done:
                return self.result
            self.result = yield from self.co.__await__()
            self.done = True
            return self.result

def cacheable(f):
    def wrapped(*args, **kwargs):
        r = f(*args, **kwargs)
        return Cacheable(r)
    return wrapped


@functools.lru_cache()
@cacheable
async def foo():
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()

AioHttp

The de facto default of making async requests in python, supposedly faster than httpx.

import aiohttp
import asyncio

params = {'key1': 'value1', 'key2': 'value2'}

async with aiohttp.ClientSession('http://httpbin.org') as session:
    async with session.get('/get', params=params):
        pass
    async with session.post('/post', json={'test': 'object'}):
        pass
    async with session.put('/put', data=b'data') as resp:
        print(await resp.json(content_type=None))

# With asyncio
urls = [
    'https://www.google.com',
    'https://www.facebook.com',
    'https://www.twitter.com'
]


async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()


async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [asyncio.ensure_future(fetch(session, url)) for url in urls]
        responses = await asyncio.gather(*tasks)
        for response in responses:
            print(response)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

AioPg

# Not Recommended - I'd try psycopg 3 instead in production
# https://www.psycopg.org/psycopg3/docs/advanced/async.html
import asyncio

async def create_table(conn):
    await conn.execute("DROP TABLE IF EXISTS tbl")
    await conn.execute(
        """CREATE TABLE tbl (
    id serial PRIMARY KEY,
    val varchar(255))"""
    )


from aiopg.sa import create_engine
async def go():
    async with create_engine(
        user="aiopg", database="aiopg", host="127.0.0.1", password="passwd"
    ) as engine:
        async with engine.acquire() as conn:
            await create_table(conn)
        async with engine.acquire() as conn:
            await ...

AioBotocore

From AWS: “Unlike Resources and Sessions, clients are generally thread-safe.” Try to stick to a singleton client with the global session between threads. In case you see the KeyError: 'credential_provider'.

   def get_component(self, name):
      if name in self._deferred:
          factory = self._deferred[name]
          self._components[name] = factory()
          # Only delete the component from the deferred dict after
          # successfully creating the object from the factory as well as
          # injecting the instantiated value into the _components dict.
          del self._deferred[name]  <---- THIS IS WHERE IT FAILED.
      try:
          return self._components[name]
      except KeyError:
          raise ValueError("Unknown component: %s" % name)

The shared session (client instance) through DEFAULT_SESSION is mutated by multiple threads. The expected key credential_provider in the self._deferred dictionary is already deleted by other thread, so when the other thread tries to delete the key credential_provider, it fails to delete it since it’s already deleted.Gatsby Lee, Misunderstanding about thread-safe

# Not Recommended - I'd run boto3 with asyncio instead
import asyncio
from aiobotocore.session import get_session

AWS_ACCESS_KEY_ID = "xxx"
AWS_SECRET_ACCESS_KEY = "xxx"

async def go():
    bucket = 'dataintake'
    filename = 'dummy.bin'
    folder = 'aiobotocore'
    key = f'{folder}/{filename}'

    session = get_session()
    async with session.create_client(
        's3',
        region_name='us-west-2',
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        aws_access_key_id=AWS_ACCESS_KEY_ID,
    ) as client:
        data = b'\x01' * 1024
        resp = await client.put_object(Bucket=bucket, Key=key, Body=data)
        print(resp)

        resp = await client.get_object_acl(Bucket=bucket, Key=key)
        print(resp)

        resp = await client.get_object(Bucket=bucket, Key=key)
        async with resp['Body'] as stream:
            await stream.read()
            print(resp)

        resp = await client.delete_object(Bucket=bucket, Key=key)
        print(resp)


if __name__ == '__main__':
    asyncio.run(go())

Concurrency in FastAPI

I use flask for visualization tasks, particularly when incorporating dash is necessary. For most data-centric projects intended for production, I would lean towards FastAPI, as suggested by the “API” in its name. It performs well right from the start, and its integration with pydantic facilitates validation, proving especially useful for serialization and deserialization processes. I don’t particularly appreciate how it manages dependency injection – singletons tend to be more beneficial in a production environment.

Don’t block the event loop

Use async def endpoints if you have an await inside; otherwise, use def endpoints. Even if you use def instead of async def, it is still an async endpoint underneath“When you declare a path operation function with normal def instead of async def, it is run in an external threadpool that is then awaited, instead of being called directly (as it would block the server).
If you are coming from another async framework that does not work in the way described above and you are used to defining trivial compute-only path operation functions with plain def for a tiny performance gain (about 100 nanoseconds), please note that in FastAPI the effect would be quite opposite. In these cases, it’s better to use async def unless your path operation functions use code that performs blocking I/O.”
(via anyio’s worker thread). Don’t use async def if your coding is blocking, as it would block the event loop.

Comparable performance between Starlette and FastAPI.

Blocking code are sent in worker threads

# client.py
# SAME CLIENT THROUGHOUT THE EXAMPLES
import aiohttp
import asyncio
import time

n_requests = 50
urls = [f"http://localhost:5003/{i}" for i in range(n_requests)]


async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()


async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [asyncio.ensure_future(fetch(session, url)) for url in urls]
        responses = await asyncio.gather(*tasks)
        for response in responses:
            print(response)


loop = asyncio.get_event_loop()

start = time.time()
loop.run_until_complete(main())
end = time.time()
print(f"""# {n_requests:-3.0f} requests:
# {end - start: .4f}s Total   @{(end-start) / n_requests: .4f}s per req""")

# server.py
from fastapi import FastAPI
import threading
import asyncio
from anyio.lowlevel import RunVar
from anyio import CapacityLimiter
from time import sleep

app = FastAPI()


@app.on_event("startup")
def startup():
    print("start up ...")
    RunVar("_default_thread_limiter").set(CapacityLimiter(5))


@app.get("/{id}")
def root(id=0):
    sleep(1)
    print(threading.active_count())

    return {"message": f"Responding to request {id}"}

# server:
# Active thread count:  6
# ---
#  50 requests:
#  10.0962s Total   @ 0.2019s per req

Async code are run in the event loop

...

@app.get("/{id}")
async def root(id=0):
    await asyncio.sleep(1)
    print(threading.active_count())

    return {"message": f"Responding to request {id}"}

# server:
# Active thread count:  1
# ---
# client:
#  50 requests:
#  1.0388s Total   @ 0.0208s per req

Not to bad to run 1000 requests at 10ms per request. This is proportional to the sleep time.

# await asyncio.sleep(1)
# 1000 requests:
#  10.2364s Total   @ 0.0102s per req

# await asyncio.sleep(0.1)
# 1000 requests:
#  1.1680s Total   @ 0.0012s per req

# await asyncio.sleep(0.01)
# 1000 requests:
#  0.2465s Total   @ 0.0002s per req

Bad async code

Introducing a blocking code in the async function. This is a bad practice as it would block the event loop. The FastAPI server would still be able to handle requests but the response time would be significantly longer.

...

@app.get("/{id}")
async def root(id=0):
    sleep(1)
    print(threading.active_count())

    return {"message": f"Responding to request {id}"}

# Active thread count:  1
# ---
# client:
#  50 requests:
#  50.3450s Total   @ 1.0069s per req

Dangerously bad async code

Essentially, no await in the async code. Python would throw the following error but this is suppressed by FastAPI:

RuntimeWarning: coroutine 'sleep' was never awaited
  asyncio.sleep(1)
...

@app.get("/{id}")
async def root(id=0):
    asyncio.sleep(1)
    print("Active thread count: ", threading.active_count())

    return {"message": f"Responding to request {id}"}

# server:
# Active thread count:  1
# ---
# client:
#  50 requests:
#  0.0360s Total   @ 0.0007s per req

You would have similar performance if you use def instead of async def. Essentially, asyncio.sleep(1) returns a coroutine but it’s just ignored. You however would expect a SyntaxError if you use await in def:

    await asyncio.sleep(1)
    ^^^^^^^^^^^^^^^^^^^^^^
SyntaxError: 'await' outside async function

Background tasks

Nothing special about background tasks: Starlette sends sync functions to the thread pool and async functions to the event loop. A blocking code in the event loop can block the entire server, same as before.

# client.py
import aiohttp
import asyncio
import time

host = "localhost"
port = 5010
base = f"http://{host}:{port}"

n_requests = 50
urls = [f"{base}/cpu_bound2"] + [f"{base}/{i}" for i in range(n_requests)]


async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()


async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [asyncio.ensure_future(fetch(session, url)) for url in urls]
        responses = await asyncio.gather(*tasks)
        for response in responses:
            print(response)


loop = asyncio.get_event_loop()

start = time.time()
loop.run_until_complete(main())
end = time.time()
print(
    f"""# {n_requests:-3.0f} requests:
# {end - start: .4f}s Total   @{(end-start) / n_requests: .4f}s per req"""
)

# server
from fastapi import FastAPI, BackgroundTasks
import threading
from anyio.lowlevel import RunVar
from anyio import CapacityLimiter
from time import sleep
import starlette.background as bg

app = FastAPI()

@app.on_event("startup")
def startup():
    print("start up ...")
    RunVar("_default_thread_limiter").set(CapacityLimiter(100))

# FastAPI style
@app.get("/cpu_bound")
async def cpu_bound(background_tasks: BackgroundTasks):
    background_tasks.add_task(sleep, 1)
    return {"message": "CPU bound task"}

# Starlette style
@app.get("/cpu_bound2")
async def cpu_bound():
    task = bg.BackgroundTasks()
    task.add_task(sleep, 1)
    return {"message": "CPU bound task"}

@app.get("/{id}")
async def root(id=0):
    await sleep(1)
    print(threading.active_count())

    return {"message": f"Responding to request {id}"}

# server:
# Active thread count:  1
# ---
# client:
#  50 requests:
#  1.0360s Total   @ 0.0207s per req

This is the entire starlette.background module:

# starlette.background.py
import asyncio
import sys
import typing

if sys.version_info >= (3, 10):  # pragma: no cover
    from typing import ParamSpec
else:  # pragma: no cover
    from typing_extensions import ParamSpec

from starlette.concurrency import run_in_threadpool

P = ParamSpec("P")
class BackgroundTask:
    def __init__(
        self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs
    ) -> None:
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.is_async = asyncio.iscoroutinefunction(func)

    async def __call__(self) -> None:
        if self.is_async:
            await self.func(*self.args, **self.kwargs)
        else:
            await run_in_threadpool(self.func, *self.args, **self.kwargs)


class BackgroundTasks(BackgroundTask):
    def __init__(self, tasks: typing.Optional[typing.Sequence[BackgroundTask]] = None):
        self.tasks = list(tasks) if tasks else []

    def add_task(
        self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs
    ) -> None:
        task = BackgroundTask(func, *args, **kwargs)
        self.tasks.append(task)

    async def __call__(self) -> None:
        for task in self.tasks:
            await task()

# starlette.concurrency.py
async def run_in_threadpool(
    func: typing.Callable[P, T], *args: P.args, **kwargs: P.kwargs
) -> T:
    if kwargs:  # pragma: no cover
        # run_sync doesn't accept 'kwargs', so bind them in here
        func = functools.partial(func, **kwargs)
    return await anyio.to_thread.run_sync(func, *args)

Execution with celery worker and redis broker

If you really want to decouple CPU-bound tasks from the main event loop, you can use celery workers. This would make requests really fast: 0.7s total time for 1000 requests, 0.7ms per request.

# server.py
from fastapi import FastAPI, BackgroundTasks
import threading
from anyio.lowlevel import RunVar
from anyio import CapacityLimiter
import asyncio

# from anyio import sleep
import starlette.background as bg
from celery import Celery

app = FastAPI()

celery = Celery(
    __name__, broker="redis://127.0.0.1:6379/0", backend="redis://127.0.0.1:6379/0"
)

@app.on_event("startup")
def startup():
    print("start up ...")
    RunVar("_default_thread_limiter").set(CapacityLimiter(100))

@celery.task
def cpu_bound(x):
    import time

    time.sleep(1)
    return f"cpu_bound {x}"

@app.get("/{id}")
async def root(id=0):
    r = cpu_bound.delay(id)
    print(threading.active_count())

    return {"message": f"Responding to request {id} ({r.id})"}

# server:
# 1000 requests:
#  0.7436s Total   @ 0.0007s per req
#
# ---
# client:
# 2 worker with 20 concurrency each
#
# start: 2024-04-02 03:30:44.492
#   end: 2024-04-02 03:31:08.986
#
# 24.494s Total   @ 0.0245s per req

Performance tuning

Performance suggestions from Marcelo Trylesinski:

Epilog: why async?

Glyph:

Unfortunately, “asynchronous” systems have often been evangelized by emphasizing a somewhat dubious optimization which allows for a higher level of I/O-bound concurrency than with preemptive threads, rather than the problems with threading as a programming model that I’ve explained above. By characterizing “asynchronousness” in this way, it makes sense to lump all 4 choices together. I’ve been guilty of this myself, especially in years past: saying that a system using Twisted is more efficient than one using an alternative approach using threads. In many cases that’s been true, but:

  1. the situation is almost always more complicated than that, when it comes to performance,
  2. “context switching” is rarely a bottleneck in real-world programs, and
  3. it’s a bit of a distraction from the much bigger advantage of event-driven programming, which is simply that it’s easier to write programs at scale, in both senses (that is, programs containing lots of code as well as programs which have many concurrent users). A system that presents “implicit coroutines” – those which may transfer control to another concurrent task at any layer of the stack without any syntactic indication that this may happen – are simply the dubious optimization by itself.

← Back to all posts