Coroutines are subroutines that can be suspended and resumed; subroutines are coroutines that run from start to end. Coroutines are concurrently executed (i.e., async) and subroutines are blocking (i.e., sync). Coroutines can be implemented with generators (@asyncio.coroutine
and types.coroutine
, see yield from
syntax introduced in PEP 380). PEP 492 introduces the async
keyword to define a native coroutine (which is still just a generator):
async def
functions are always coroutines, even if they do not contain await expressions.- It is a
SyntaxError
to haveyield
or yield from expressions in an async function.- Internally, two new code object flags were introduced:
CO_COROUTINE
is used to mark native coroutines (defined with new syntax).CO_ITERABLE_COROUTINE
is used to make generator-based coroutines compatible with native coroutines (set bytypes.coroutine()
function).- Regular generators, when called, return a generator object; similarly, coroutines return a coroutine object.
StopIteration
exceptions are not propagated out of coroutines, and are replaced with aRuntimeError
. For regular generators such behavior requires a future import (see PEP 479).
The word “coroutine”, like the word “generator”, is used for two different (though related) concepts – identifying a map by its image:
- The function that defines a coroutine (a function definition decorated with asyncio.coroutine). If disambiguation is needed we will call this a coroutine function.
- The object obtained by calling a coroutine function. This object represents a computation or an I/O operation (usually a combination) that will complete eventually. If disambiguation is needed we will call it a coroutine object.
await
suspends execution of a coroutine until an awaitable completes and returns the result. An awaitable can be one of the following:
- A native coroutine object returned from a native coroutine function.
- A generator-based coroutine object returned from a function decorated with
types.coroutine()
.- An object with an
__await__
method returning an iterator.- Any
yield from
chain of calls ends with ayield
. This is a fundamental mechanism of howFutures
are implemented. Since, internally, coroutines are a special kind of generators, everyawait
is suspended by ayield
somewhere down the chain ofawait
calls (please refer to PEP 3156 for a detailed explanation). To enable this behavior for coroutines, a new magic method called__await__
is added. Inasyncio
, for instance, to enable Future objects in await statements, the only change is to add__await__ == __iter__
line toasyncio.Future
class. Objects with__await__
method are called Future-like objects in the rest of this PEP.
GIL prevents parallel execution of bytecode, essentially pinning bytecode to one active thread. Global in python means per process. Python threads themselves are just normal OS-level pthreads, as opposed to green threads: host OS is responsible for scheduling them. GIL for example does not prevent these operations from running concurrently across multiple cores:
- IO: Networking or file handling
- Builtin CPU-bound operations, such as hashing or compressing
- Some C extensions, such as numpy
Threading comes with overheads. But it’s not as bad as what most people think:
Actually, your virtual stack size is 8388608 bytes (8 MB). Of course, it’s natural to conclude that this can’t be right, because that’s a ridiculously large amount of memory for every thread to consume for its stack when 99% of the time a couple of KB is probably all they need. The good news is that your thread only uses the amount of physical memory that it actually needs. This is one of the magical powers that your OS gets from using the hardware Memory Management Unit (MMU) in your processor. Here’s what happens:
- The OS allocates 8 MB of virtual memory for your stack by setting up the MMU’s page tables for your thread. This requires very little RAM to hold the page table entries only.
- When your thread runs and tries to access a virtual address on the stack that doesn’t have a physical page assigned to it yet, a hardware exception called a “page fault” is triggered by the MMU.
- The CPU core responds to the page fault exception by switching to a privileged execution mode (which has its own stack) and calling the page fault exception handler function inside the kernel.
- The kernel allocates a page of physical RAM to that virtual memory page and returns back to the user space thread.
- The user space thread sees none of that work. From its point of view, it just uses the stack as if the memory was there all along. Meanwhile, the stack automatically grows (or doesn’t) to meet the thread’s needs.
The MMU is a key part of the hardware of today’s computer systems. In particular, it’s responsible for a lot of the “magic” in the system, so I highly recommend learning more about what the MMU does, and about virtual memory in general. Also, if your application is performance sensitive and deals with a significant amount of data, you should understand how the TLB (the MMU’s page table cache) works and how you can restructure your data or your algorithms to maximize your TLB hit rate.
Python 3.2 introduced a new GIL. The “check interval” based on “ticks” is replaced by an absolute duration expressed in seconds. Some background on mutex (mutual exclusion) lock. A mutex can be released only by the thread that had acquired it. A binary semaphore can be signaled by any thread.
sys.setswitchinterval(interval:float = 0.005)
A thread needs to wait for wall time switchinterval
before sending a gil_drop_request
. “A primitive lock is in one of two states, “locked” or “unlocked”. It is created in the unlocked state. It has two basic methods, acquire()
and release()
. When the state is unlocked, acquire()
changes the state to locked and returns immediately. When the state is locked, acquire()
blocks until a call to release()
in another thread changes it to unlocked, then the acquire()
call resets it to locked and returns. The release()
method should only be called in the locked state; it changes the state to unlocked and returns immediately. If an attempt is made to release an unlocked lock, a RuntimeError
will be raised.”
“A condition variable is always associated with some kind of lock; this can be passed in or one will be created by default. Passing one in is useful when several condition variables must share the same lock. The lock is part of the condition object: you don’t have to track it separately.
A condition variable obeys the context management protocol: using the with statement acquires the associated lock for the duration of the enclosed block. The acquire()
and release()
methods also call the corresponding methods of the associated lock.
Other methods must be called with the associated lock held. The wait()
method releases the lock, and then blocks until another thread awakens it by calling notify()
or notify_all().
Once awakened, wait()
re-acquires the lock and returns. It is also possible to specify a timeout. The notify()
method wakes up one of the threads waiting for the condition variable, if any are waiting. The notify_all()
method wakes up all threads waiting for the condition variable.
Note: the notify()
and notify_all()
methods don’t release the lock; this means that the thread or threads awakened will not return from their wait()
call immediately, but only when the thread that called notify()
or notify_all()
finally relinquishes ownership of the lock.
The typical programming style using condition variables uses the lock to synchronize access to some shared state; threads that are interested in a particular change of state call wait()
repeatedly until they see the desired state, while threads that modify the state call notify()
or notify_all()
when they change the state in such a way that it could possibly be a desired state for one of the waiters.”
“Wait until notified or until a timeout occurs. If the calling thread has not acquired the lock when this method is called, a RuntimeError
is raised.
This method releases the underlying lock, and then blocks until it is awakened by a notify()
or notify_all()
call for the same condition variable in another thread, or until the optional timeout
occurs. Once awakened or timed out, it re-acquires the lock and returns.
When the timeout
argument is present and not None, it should be a floating point number specifying a timeout
for the operation in seconds (or fractions thereof).
When the underlying lock is an RLock
, it is not released using its release()
method, since this may not actually unlock the lock when it was acquired multiple times recursively. Instead, an internal interface of the RLock
class is used, which really unlocks it even when it has been recursively acquired several times. Another internal interface is then used to restore the recursion level when the lock is reacquired.
The return value is True
unless a given timeout expired, in which case it is False
.”
# Modified from https://rushter.com/blog/python-gil-thread-scheduling/
import threading
from types import SimpleNamespace
DEFAULT_INTERVAL = 0.05
gil_mutex = threading.RLock()
gil_cond = threading.Condition(lock=gil_mutex)
switch_cond = threading.Condition()
gil = SimpleNamespace(
drop_request=False,
locked=False,
switch_number=0,
last_holder=None,
eval_breaker=True
)
# When Python creates a thread it calls take_gil before the thread enters the loop
def take_gil(thread_id):
gil_cond.acquire()
while gil.locked:
saved_switchnum = gil.switch_number
timed_out = not gil_cond.wait(timeout=DEFAULT_INTERVAL)
if timed_out and gil.locked and gil.switch_number == saved_switchnum:
gil.drop_request = True
switch_cond.acquire()
gil.locked = True
if gil.last_holder != thread_id:
gil.last_holder = thread_id
gil.switch_number += 1
switch_cond.notify()
switch_cond.release()
if gil.drop_request:
gil.drop_request = False
gil_cond.release()
def drop_gil(thread_id):
if not gil.locked:
raise Exception("GIL is not locked")
gil_cond.acquire()
gil.last_holder = thread_id
gil.locked = False
gil_cond.notify()
gil_cond.release()
if gil.drop_request:
switch_cond.acquire()
if gil.last_holder == thread_id:
gil.drop_request = False
switch_cond.wait()
switch_cond.release()
# All threads run in the same execution_loop
def execution_loop(target_function, thread_id):
bytecode = compile(target_function)
while True:
if gil.drop_request:
drop_gil(thread_id)
take_gil(thread_id)
instruction = bytecode.next_instruction()
if instruction is not None:
execute_opcode(instruction)
else:
return
'''
https://github.com/python/cpython/blob/94c97423a9c4969f8ddd4a3aa4aacb99c4d5263d/Python/ceval_gil.c#L11
Notes about the implementation:
- The GIL is just a boolean variable (locked) whose access is protected
by a mutex (gil_mutex), and whose changes are signalled by a condition
variable (gil_cond). gil_mutex is taken for short periods of time,
and therefore mostly uncontended.
- In the GIL-holding thread, the main loop (PyEval_EvalFrameEx) must be
able to release the GIL on demand by another thread. A volatile boolean
variable (gil_drop_request) is used for that purpose, which is checked
at every turn of the eval loop. That variable is set after a wait of
`interval` microseconds on `gil_cond` has timed out.
[Actually, another volatile boolean variable (eval_breaker) is used
which ORs several conditions into one. Volatile booleans are
sufficient as inter-thread signalling means since Python is run
on cache-coherent architectures only.]
- A thread wanting to take the GIL will first let pass a given amount of
time (`interval` microseconds) before setting gil_drop_request. This
encourages a defined switching period, but doesn't enforce it since
opcodes can take an arbitrary time to execute.
The `interval` value is available for the user to read and modify
using the Python API `sys.{get,set}switchinterval()`.
- When a thread releases the GIL and gil_drop_request is set, that thread
ensures that another GIL-awaiting thread gets scheduled.
It does so by waiting on a condition variable (switch_cond) until
the value of last_holder is changed to something else than its
own thread state pointer, indicating that another thread was able to
take the GIL.
This is meant to prohibit the latency-adverse behaviour on multi-core
machines where one thread would speculatively release the GIL, but still
run and end up being the first to re-acquire it, making the "timeslices"
much longer than expected.
(Note: this mechanism is enabled with FORCE_SWITCHING above)
'''
Even the new GIL isn’t perfect. IO-bound operations always release the lock and CPU-bound threads will always try to take back the lock: CPU-bound threads tend to hog the GIL, leaving IO threads stalling. The improvement over the previous implementation is that IO threads now at least periodically hold the lock after deterministic timeout, since the lock holding thread has to drop it. Note the thread that makes the drop request does not always get the lock later: the host OS assigns the thread execution order. See Larry Hastings’s ongoing gilectomy work.
Mostly from PEP 3156. When a coroutine is wrapped into a Task with functions like asyncio.create_task()
the coroutine is automatically scheduled to run soon. A Future is a special low-level awaitable object that represents an eventual result of an asynchronous operation. Future object is awaited it means that the coroutine will wait until the Future is resolved in some other place. Future objects in asyncio are needed to allow callback-based code to be used with async/await.With e.g., add_done_callback
. If the Future is already done when this method is called, the callback is scheduled with loop.call_soon()
. Normally there is no need to create Future objects at the application level code.
asyncio
synchronization primitives are designed to be similar to those of the threading module with two important caveats: not thread-safe and no timeout
. The concurrent.futures
module provides a high-level interface for asynchronously executing callables. asyncio queues are designed to be similar to classes of the queue
module. Although asyncio queues
are not thread-safe, they are designed to be used specifically in async/await code.
Blocking (CPU-bound) code should not be called directly. For example, if a function performs a CPU-intensive calculation for 1 second, all concurrent asyncio Tasks and IO operations would be delayed by 1 second. An executor can be used to run a task in a different thread or even in a different process to avoid blocking the OS thread with the event loop. See the loop.run_in_executor()
method for more details.
When a coroutine function is called, but not awaited (e.g. coro()
instead of await coro()
) or the coroutine is not scheduled with asyncio.create_task()
, asyncio will emit a RuntimeWarning:
# coroutine arguments will be wrapped in Tasks
# returns a tuple of two sets of Futures
done, pending = asyncio.wait(fs, timeout=None, return_when=ALL_COMPLETED)
# returns an iterator whose values are Futures or coros
for f in asyncio.as_completed(fs):
result = yield from f # May raise an exception
# use result
...
# wait for a single coro
asyncio.wait_for(f, timeout)
# returns a future which waits until all arguments (Futures or coros) are done
# and return a list of their corresponding results
asyncio.gather(f1, f2, ...)
Grandpa of python concurrency libraries.Bite Code!, Asyncio, twisted, tornado, gevent walk into a bar…
curio
(no longer in active development) grew out of David Beazley’s famous Die Threads presentation. Around the same time, NJS published Notes on Structured Concurrency, or: Go Statement Considered Harmful and gave us trio
, taking inspiration from curio
. “Trio is not compatible with asyncio, nor gevent or twisted by default. This means it’s also its little own async island.” You can however use AnyIO
as the abstraction layer.
It implements trio-like structured concurrency (a good intro on SC) on top of asyncio and works in harmony with the native SC of trio itself. Starlette
is based on AnyIO
, so you can directly use AnyIO
for advanced concurrency use cases. This is my preferred way to write async code.
from anyio import sleep, create_task_group, run
async def sometask(num: int) -> None:
print('Task', num, 'running')
await sleep(1)
print('Task', num, 'finished')
async def main() -> None:
async with create_task_group() as tg:
for num in range(5):
tg.start_soon(sometask, num)
print('All tasks finished!')
run(main)
#3.11+
from anyio import create_task_group
try:
async with create_task_group() as tg:
tg.start_soon(some_task)
tg.start_soon(another_task)
except* ValueError as excgroup:
for exc in excgroup.exceptions:
... # handle each ValueError
except* KeyError as excgroup:
for exc in excgroup.exceptions:
... # handle each KeyError
from anyio import create_task_group, move_on_after, sleep, run
async def main():
async with create_task_group() as tg:
with move_on_after(1) as scope:
print('Starting sleep')
await sleep(2)
print('This should never be printed')
# The cancelled_caught property will be True if timeout was reached
print('Exited cancel scope, cancelled =', scope.cancelled_caught)
run(main)
async def external_task():
print('Started sleeping in the external task')
await sleep(1)
print('This line should never be seen')
async def main():
async with create_task_group() as tg:
with CancelScope(shield=True) as scope:
tg.start_soon(external_task)
tg.cancel_scope.cancel()
print('Started sleeping in the host task')
await sleep(1)
print('Finished sleeping in the host task')
run(main)
from anyio import CancelScope, create_task_group, sleep, run
async def external_task():
print('Started sleeping in the external task')
await sleep(1)
print('This line should never be seen')
async def main():
async with create_task_group() as tg:
with CancelScope(shield=True) as scope:
tg.start_soon(external_task)
tg.cancel_scope.cancel()
print('Started sleeping in the host task')
await sleep(1)
print('Finished sleeping in the host task')
run(main)
This is what starlette/FastAPI use to run sync functions concurrently.The default AnyIO worker thread limiter has a value of 40, meaning that any calls to to_thread.run_sync() without an explicit limiter argument will cause a maximum of 40 threads to be spawned. You can adjust this limit like this: to_thread.current_default_thread_limiter().total_tokens = 60
. AnyIO’s default thread pool limiter does not affect the default thread pool executor on asyncio.
import time
from anyio import to_thread, run
async def main():
await to_thread.run_sync(time.sleep, 5)
run(main)
Since jupyter already has a running event loop, code like below will throw RuntimeError: asyncio.run() cannot be called from a running event loop
. (Or you can simply await
.)
import asyncio
async def main():
print('hello')
await asyncio.sleep(1)
print('world')
asyncio.run(main())
You could add the task to the current loop:
loop = asyncio.get_event_loop()
loop.create_task(main())
Straightforward extension to coros but this illustrates the parallel worlds between sync functions and async functions.
class Cacheable:
def __init__(self, co):
self.co = co
self.done = False
self.result = None
self.lock = asyncio.Lock()
def __await__(self):
with (yield from self.lock):
if self.done:
return self.result
self.result = yield from self.co.__await__()
self.done = True
return self.result
def cacheable(f):
def wrapped(*args, **kwargs):
r = f(*args, **kwargs)
return Cacheable(r)
return wrapped
@functools.lru_cache()
@cacheable
async def foo():
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()
The de facto default of making async requests in python, supposedly faster than httpx
.
import aiohttp
import asyncio
params = {'key1': 'value1', 'key2': 'value2'}
async with aiohttp.ClientSession('http://httpbin.org') as session:
async with session.get('/get', params=params):
pass
async with session.post('/post', json={'test': 'object'}):
pass
async with session.put('/put', data=b'data') as resp:
print(await resp.json(content_type=None))
# With asyncio
urls = [
'https://www.google.com',
'https://www.facebook.com',
'https://www.twitter.com'
]
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
tasks = [asyncio.ensure_future(fetch(session, url)) for url in urls]
responses = await asyncio.gather(*tasks)
for response in responses:
print(response)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# Not Recommended - I'd try psycopg 3 instead in production
# https://www.psycopg.org/psycopg3/docs/advanced/async.html
import asyncio
async def create_table(conn):
await conn.execute("DROP TABLE IF EXISTS tbl")
await conn.execute(
"""CREATE TABLE tbl (
id serial PRIMARY KEY,
val varchar(255))"""
)
from aiopg.sa import create_engine
async def go():
async with create_engine(
user="aiopg", database="aiopg", host="127.0.0.1", password="passwd"
) as engine:
async with engine.acquire() as conn:
await create_table(conn)
async with engine.acquire() as conn:
await ...
From AWS: “Unlike Resources and Sessions, clients are generally thread-safe.” Try to stick to a singleton client with the global session between threads. In case you see the KeyError: 'credential_provider'
.
def get_component(self, name): if name in self._deferred: factory = self._deferred[name] self._components[name] = factory() # Only delete the component from the deferred dict after # successfully creating the object from the factory as well as # injecting the instantiated value into the _components dict. del self._deferred[name] <---- THIS IS WHERE IT FAILED. try: return self._components[name] except KeyError: raise ValueError("Unknown component: %s" % name)
The shared session (client instance) through
DEFAULT_SESSION
is mutated by multiple threads. The expected keycredential_provider
in theself._deferred
dictionary is already deleted by other thread, so when the other thread tries to delete the keycredential_provider
, it fails to delete it since it’s already deleted.Gatsby Lee, Misunderstanding about thread-safe
# Not Recommended - I'd run boto3 with asyncio instead
import asyncio
from aiobotocore.session import get_session
AWS_ACCESS_KEY_ID = "xxx"
AWS_SECRET_ACCESS_KEY = "xxx"
async def go():
bucket = 'dataintake'
filename = 'dummy.bin'
folder = 'aiobotocore'
key = f'{folder}/{filename}'
session = get_session()
async with session.create_client(
's3',
region_name='us-west-2',
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID,
) as client:
data = b'\x01' * 1024
resp = await client.put_object(Bucket=bucket, Key=key, Body=data)
print(resp)
resp = await client.get_object_acl(Bucket=bucket, Key=key)
print(resp)
resp = await client.get_object(Bucket=bucket, Key=key)
async with resp['Body'] as stream:
await stream.read()
print(resp)
resp = await client.delete_object(Bucket=bucket, Key=key)
print(resp)
if __name__ == '__main__':
asyncio.run(go())
I use flask
for visualization tasks, particularly when incorporating dash
is necessary. For most data-centric projects intended for production, I would lean towards FastAPI
, as suggested by the “API” in its name. It performs well right from the start, and its integration with pydantic
facilitates validation, proving especially useful for serialization and deserialization processes. I don’t particularly appreciate how it manages dependency injection – singletons tend to be more beneficial in a production environment.
Use async def
endpoints if you have an await
inside; otherwise, use def
endpoints. Even if you use def
instead of async def
, it is still an async
endpoint underneath“When you declare a path operation function with normal def
instead of async def
, it is run in an external threadpool that is then awaited
, instead of being called directly (as it would block the server).
If you are coming from another async
framework that does not work in the way described above and you are used to defining trivial compute-only path operation functions with plain def
for a tiny performance gain (about 100 nanoseconds), please note that in FastAPI the effect would be quite opposite. In these cases, it’s better to use async def
unless your path operation functions use code that performs blocking I/O.” (via anyio
’s worker thread). Don’t use async def
if your coding is blocking, as it would block the event loop.
Comparable performance between Starlette
and FastAPI
.
# client.py
# SAME CLIENT THROUGHOUT THE EXAMPLES
import aiohttp
import asyncio
import time
n_requests = 50
urls = [f"http://localhost:5003/{i}" for i in range(n_requests)]
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
tasks = [asyncio.ensure_future(fetch(session, url)) for url in urls]
responses = await asyncio.gather(*tasks)
for response in responses:
print(response)
loop = asyncio.get_event_loop()
start = time.time()
loop.run_until_complete(main())
end = time.time()
print(f"""# {n_requests:-3.0f} requests:
# {end - start: .4f}s Total @{(end-start) / n_requests: .4f}s per req""")
# server.py
from fastapi import FastAPI
import threading
import asyncio
from anyio.lowlevel import RunVar
from anyio import CapacityLimiter
from time import sleep
app = FastAPI()
@app.on_event("startup")
def startup():
print("start up ...")
RunVar("_default_thread_limiter").set(CapacityLimiter(5))
@app.get("/{id}")
def root(id=0):
sleep(1)
print(threading.active_count())
return {"message": f"Responding to request {id}"}
# server:
# Active thread count: 6
# ---
# 50 requests:
# 10.0962s Total @ 0.2019s per req
...
@app.get("/{id}")
async def root(id=0):
await asyncio.sleep(1)
print(threading.active_count())
return {"message": f"Responding to request {id}"}
# server:
# Active thread count: 1
# ---
# client:
# 50 requests:
# 1.0388s Total @ 0.0208s per req
Not to bad to run 1000 requests at 10ms per request. This is proportional to the sleep time.
# await asyncio.sleep(1)
# 1000 requests:
# 10.2364s Total @ 0.0102s per req
# await asyncio.sleep(0.1)
# 1000 requests:
# 1.1680s Total @ 0.0012s per req
# await asyncio.sleep(0.01)
# 1000 requests:
# 0.2465s Total @ 0.0002s per req
Introducing a blocking code in the async function. This is a bad practice as it would block the event loop. The FastAPI
server would still be able to handle requests but the response time would be significantly longer.
...
@app.get("/{id}")
async def root(id=0):
sleep(1)
print(threading.active_count())
return {"message": f"Responding to request {id}"}
# Active thread count: 1
# ---
# client:
# 50 requests:
# 50.3450s Total @ 1.0069s per req
Essentially, no await in the async code. Python would throw the following error but this is suppressed by FastAPI
:
RuntimeWarning: coroutine 'sleep' was never awaited
asyncio.sleep(1)
...
@app.get("/{id}")
async def root(id=0):
asyncio.sleep(1)
print("Active thread count: ", threading.active_count())
return {"message": f"Responding to request {id}"}
# server:
# Active thread count: 1
# ---
# client:
# 50 requests:
# 0.0360s Total @ 0.0007s per req
You would have similar performance if you use def
instead of async def
. Essentially, asyncio.sleep(1)
returns a coroutine but it’s just ignored. You however would expect a SyntaxError
if you use await
in def
:
await asyncio.sleep(1)
^^^^^^^^^^^^^^^^^^^^^^
SyntaxError: 'await' outside async function
Nothing special about background tasks: Starlette
sends sync functions to the thread pool and async functions to the event loop. A blocking code in the event loop can block the entire server, same as before.
# client.py
import aiohttp
import asyncio
import time
host = "localhost"
port = 5010
base = f"http://{host}:{port}"
n_requests = 50
urls = [f"{base}/cpu_bound2"] + [f"{base}/{i}" for i in range(n_requests)]
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
tasks = [asyncio.ensure_future(fetch(session, url)) for url in urls]
responses = await asyncio.gather(*tasks)
for response in responses:
print(response)
loop = asyncio.get_event_loop()
start = time.time()
loop.run_until_complete(main())
end = time.time()
print(
f"""# {n_requests:-3.0f} requests:
# {end - start: .4f}s Total @{(end-start) / n_requests: .4f}s per req"""
)
# server
from fastapi import FastAPI, BackgroundTasks
import threading
from anyio.lowlevel import RunVar
from anyio import CapacityLimiter
from time import sleep
import starlette.background as bg
app = FastAPI()
@app.on_event("startup")
def startup():
print("start up ...")
RunVar("_default_thread_limiter").set(CapacityLimiter(100))
# FastAPI style
@app.get("/cpu_bound")
async def cpu_bound(background_tasks: BackgroundTasks):
background_tasks.add_task(sleep, 1)
return {"message": "CPU bound task"}
# Starlette style
@app.get("/cpu_bound2")
async def cpu_bound():
task = bg.BackgroundTasks()
task.add_task(sleep, 1)
return {"message": "CPU bound task"}
@app.get("/{id}")
async def root(id=0):
await sleep(1)
print(threading.active_count())
return {"message": f"Responding to request {id}"}
# server:
# Active thread count: 1
# ---
# client:
# 50 requests:
# 1.0360s Total @ 0.0207s per req
This is the entire starlette.background
module:
# starlette.background.py
import asyncio
import sys
import typing
if sys.version_info >= (3, 10): # pragma: no cover
from typing import ParamSpec
else: # pragma: no cover
from typing_extensions import ParamSpec
from starlette.concurrency import run_in_threadpool
P = ParamSpec("P")
class BackgroundTask:
def __init__(
self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs
) -> None:
self.func = func
self.args = args
self.kwargs = kwargs
self.is_async = asyncio.iscoroutinefunction(func)
async def __call__(self) -> None:
if self.is_async:
await self.func(*self.args, **self.kwargs)
else:
await run_in_threadpool(self.func, *self.args, **self.kwargs)
class BackgroundTasks(BackgroundTask):
def __init__(self, tasks: typing.Optional[typing.Sequence[BackgroundTask]] = None):
self.tasks = list(tasks) if tasks else []
def add_task(
self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs
) -> None:
task = BackgroundTask(func, *args, **kwargs)
self.tasks.append(task)
async def __call__(self) -> None:
for task in self.tasks:
await task()
# starlette.concurrency.py
async def run_in_threadpool(
func: typing.Callable[P, T], *args: P.args, **kwargs: P.kwargs
) -> T:
if kwargs: # pragma: no cover
# run_sync doesn't accept 'kwargs', so bind them in here
func = functools.partial(func, **kwargs)
return await anyio.to_thread.run_sync(func, *args)
celery
worker and redis
brokerIf you really want to decouple CPU-bound tasks from the main event loop, you can use celery
workers. This would make requests really fast: 0.7s total time for 1000 requests, 0.7ms per request.
# server.py
from fastapi import FastAPI, BackgroundTasks
import threading
from anyio.lowlevel import RunVar
from anyio import CapacityLimiter
import asyncio
# from anyio import sleep
import starlette.background as bg
from celery import Celery
app = FastAPI()
celery = Celery(
__name__, broker="redis://127.0.0.1:6379/0", backend="redis://127.0.0.1:6379/0"
)
@app.on_event("startup")
def startup():
print("start up ...")
RunVar("_default_thread_limiter").set(CapacityLimiter(100))
@celery.task
def cpu_bound(x):
import time
time.sleep(1)
return f"cpu_bound {x}"
@app.get("/{id}")
async def root(id=0):
r = cpu_bound.delay(id)
print(threading.active_count())
return {"message": f"Responding to request {id} ({r.id})"}
# server:
# 1000 requests:
# 0.7436s Total @ 0.0007s per req
#
# ---
# client:
# 2 worker with 20 concurrency each
#
# start: 2024-04-02 03:30:44.492
# end: 2024-04-02 03:31:08.986
#
# 24.494s Total @ 0.0245s per req
Performance suggestions from Marcelo Trylesinski:
uvloop
– fast drop-in replacement of built-in asyncio event loophttptools
– faster HTTP parserasync def
– it will be converted to async with the executor anyway; this avoids the overheadasync def startup():
limiter = anyio.to_thread.current_default_thread_limiter()
limiter.total_tokens = 1000 # default is 40
app = FastAPI(on_startup=[startup])
response_model
ORJSON
-app = FastAPI(default_response_class=ORJSONResponse)
pydantic
GZipMiddlware
Use ASGI middleware, instead of BaaseHTTPMiddleware
from fastapi import FastAPI, Request
from starlette.datastructures import MutableHeaders
class AddPotatoHeaderMiddleware:
def __init__(self, app):
self.app = app
def __call__(self, scope, receive, send):
if scope["type"] != "http":
return await self.app(scope, receive, send)
async def send_with_potato(message):
if message["type"] == "http.response.start":
headers = MutableHeaders(scope=message)
headers.append("X-Potato", "Banana")
await send(message)
await self.app(scope, receive, send_with_potato)
app = FastAPI()
app.add_middleware(AddPotatoHeaderMiddleware)
Unfortunately, “asynchronous” systems have often been evangelized by emphasizing a somewhat dubious optimization which allows for a higher level of I/O-bound concurrency than with preemptive threads, rather than the problems with threading as a programming model that I’ve explained above. By characterizing “asynchronousness” in this way, it makes sense to lump all 4 choices together. I’ve been guilty of this myself, especially in years past: saying that a system using Twisted is more efficient than one using an alternative approach using threads. In many cases that’s been true, but:
- the situation is almost always more complicated than that, when it comes to performance,
- “context switching” is rarely a bottleneck in real-world programs, and
- it’s a bit of a distraction from the much bigger advantage of event-driven programming, which is simply that it’s easier to write programs at scale, in both senses (that is, programs containing lots of code as well as programs which have many concurrent users). A system that presents “implicit coroutines” – those which may transfer control to another concurrent task at any layer of the stack without any syntactic indication that this may happen – are simply the dubious optimization by itself.