Migrating to AsyncIO
This chapter aims to guide you through the process of migrating your codebase from Eventlet to AsyncIO. We will cover the necessary steps, best practices, and common pitfalls to avoid during the migration.
Overview of AsyncIO
AsyncIO is a library to write concurrent code using the async/await syntax. It is part of the Python standard library and provides a foundation for asynchronous programming in Python.
AsyncIO allows you to write programs that can handle many tasks at once, such as network requests, file I/O, and other I/O-bound operations, without blocking the main thread. This is achieved through the use of coroutines, which are special functions that can pause and resume their execution, allowing other coroutines to run in the meantime.
The core components of AsyncIO include:
- Event Loop: The central component that manages the execution of asynchronous tasks. It runs in a single thread and schedules the execution of coroutines, callbacks, and other I/O operations.
- Coroutines: Functions defined with the
async def
syntax. They use theawait
keyword to pause their execution and yield control back to the event loop. - Tasks: Wrappers around coroutines that allow them to be scheduled and managed by the event loop. They can be created using the
asyncio.create_task()
function. - Futures: Objects that represent the result of an asynchronous operation. They can be awaited to retrieve the result once the operation is complete.
AsyncIO also provides various utilities for managing asynchronous tasks, such as synchronization primitives (e.g., locks, semaphores), queues, and streams for handling network I/O.
For more details, you can consult the official AsyncIO documentation here.
Steps to Migrate
The following steps outline the process of migrating from Eventlet to AsyncIO:
- Activate the AsyncIO hub in Eventlet: Begin by switching to Eventlet's AsyncIO hub to enable compatibility with AsyncIO code.
import eventlet eventlet.use_hub('asyncio') # If you use monkey patching, place it after hub activation eventlet.monkey_patch()
This critical first step allows you to gradually introduce AsyncIO code while maintaining compatibility with existing Eventlet code. For more details, refer to the official Eventlet documentation.
- Identify Eventlet usages in your codebase.
- Replace Eventlet-specific code with AsyncIO equivalents.
- Refactor synchronous code to use async/await syntax.
- Test the migrated code thoroughly to ensure functionality.
- Optimize performance and handle edge cases.
Replacing Eventlet with AsyncIO
This section provides examples of how to replace common Eventlet-specific code with AsyncIO equivalents. For each common Eventlet usage, we'll show how to refactor the code using AsyncIO.
WSGI Server
Eventlet:
from eventlet import wsgi, listen
def simple_app(environ, start_response):
start_response('200 OK', [('Content-Type', 'text/plain')])
return [b'Hello, World!\n']
wsgi.server(listen(('', 8080)), simple_app)
AsyncIO:
from aiohttp import web
async def simple_app(request):
return web.Response(text="Hello, World!")
app = web.Application()
app.add_routes([web.get('/', simple_app)])
if __name__ == '__main__':
web.run_app(app, port=8080)
For WSGI servers, use aiohttp or FastAPI with uvicorn as an ASGI server.
Asynchronous Network Calls
Eventlet:
import eventlet
from eventlet.green import urllib2
def fetch_url(url):
return urllib2.urlopen(url).read()
urls = ['http://example.com', 'http://example.org']
pool = eventlet.GreenPool()
for body in pool.imap(fetch_url, urls):
print(body)
AsyncIO:
import aiohttp
import asyncio
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = ['http://example.com', 'http://example.org']
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
for body in await asyncio.gather(*tasks):
print(body)
asyncio.run(main())
For HTTP requests, use aiohttp.ClientSession with asyncio.gather to run multiple requests concurrently.
Background Execution of Long Tasks
Eventlet:
import eventlet
def long_running_task():
# Simulate a long-running task
eventlet.sleep(10)
print("Task completed")
eventlet.spawn(long_running_task)
print("Main thread continues to run")
AsyncIO:
import asyncio
async def long_running_task():
# Simulate a long-running task
await asyncio.sleep(10)
print("Task completed")
async def main():
# Create a task that runs in the background
task = asyncio.create_task(long_running_task())
print("Main thread continues to run")
# Wait for the task to complete
await task
asyncio.run(main())
Use asyncio.create_task()
to run coroutines concurrently in the background.
Deferred Task Management
Eventlet:
from eventlet.event import Event
def deferred_task(evt):
print("Waiting for event")
evt.wait()
print("Event occurred, executing task")
evt = Event()
eventlet.spawn_after(5, deferred_task, evt)
eventlet.sleep(5)
evt.send()
AsyncIO:
import asyncio
async def deferred_task(event):
print("Waiting for event")
await event.wait()
print("Event occurred, executing task")
async def main():
event = asyncio.Event()
# Schedule the task
task = asyncio.create_task(deferred_task(event))
# Wait a bit then set the event
await asyncio.sleep(5)
event.set()
await task
asyncio.run(main())
Use asyncio.Event()
for synchronization between coroutines.
Green Thread Management
Eventlet:
import eventlet
def task1():
while True:
print("Task 1 running")
eventlet.sleep(1)
def task2():
while True:
print("Task 2 running")
eventlet.sleep(1)
eventlet.spawn(task1)
eventlet.spawn(task2)
eventlet.sleep(5) # Let tasks run for a while
AsyncIO:
import asyncio
async def task1():
while True:
print("Task 1 running")
await asyncio.sleep(1)
async def task2():
while True:
print("Task 2 running")
await asyncio.sleep(1)
async def main():
# Create tasks
task_1 = asyncio.create_task(task1())
task_2 = asyncio.create_task(task2())
# Run for a limited time
await asyncio.sleep(5)
# Cancel tasks
task_1.cancel()
task_2.cancel()
try:
await asyncio.gather(task_1, task_2)
except asyncio.CancelledError:
pass
asyncio.run(main())
Use asyncio.create_task()
to run multiple coroutines concurrently within a single thread.
Socket Compatibility
Eventlet:
import eventlet
from eventlet.green import socket
def echo_server(port):
server = socket.socket()
server.bind(('0.0.0.0', port))
server.listen(5)
while True:
client_socket, addr = server.accept()
eventlet.spawn(handle_client, client_socket)
def handle_client(client_socket):
while True:
data = client_socket.recv(1024)
if not data:
break
client_socket.sendall(data)
client_socket.close()
echo_server(6000)
AsyncIO:
import asyncio
async def handle_client(reader, writer):
while True:
data = await reader.read(1024)
if not data:
break
writer.write(data)
await writer.drain()
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(
handle_client, '0.0.0.0', 6000
)
async with server:
await server.serve_forever()
asyncio.run(main())
Use asyncio.start_server()
for TCP servers and asyncio.open_connection()
for TCP clients.
WebSocket Support
Eventlet:
import eventlet
from eventlet import websocket
@websocket.WebSocketWSGI
def echo(ws):
while True:
msg = ws.wait()
if msg is None:
break
ws.send(msg)
server = eventlet.listen(('0.0.0.0', 7000))
eventlet.wsgi.server(server, echo)
AsyncIO:
import asyncio
import websockets
async def echo(websocket, path):
async for message in websocket:
await websocket.send(message)
async def main():
async with websockets.serve(echo, "0.0.0.0", 7000):
await asyncio.Future() # Run forever
asyncio.run(main())
Use websockets library for WebSocket communication.
Integration with Other Libraries
Eventlet:
import eventlet
from eventlet.green import threading
import queue
def worker():
while True:
item = q.get()
if item is None:
break
print(f"Processing {item}")
q.task_done()
q = queue.Queue()
threads = []
for i in range(3):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for item in range(10):
q.put(item)
q.join()
for i in range(3):
q.put(None)
for t in threads:
t.join()
AsyncIO:
import asyncio
async def worker(name, queue):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"Worker {name} processing {item}")
await asyncio.sleep(0.1) # Simulate processing time
queue.task_done()
async def main():
queue = asyncio.Queue()
# Create worker tasks
workers = [
asyncio.create_task(worker(f"worker-{i}", queue))
for i in range(3)
]
# Add items to the queue
for item in range(10):
await queue.put(item)
# Wait until all items are processed
await queue.join()
# Send shutdown signal to workers
for _ in workers:
await queue.put(None)
# Wait for workers to finish
await asyncio.gather(*workers)
asyncio.run(main())
Use asyncio.Queue
for task distribution among coroutines.
Tips for Refactoring
- Start by identifying pure I/O operations that can be made asynchronous
- Refactor functions into coroutines by adding
async
to function definitions andawait
to calls that need to wait - Use
asyncio.gather()
to run multiple coroutines concurrently - Be careful with CPU-bound tasks - consider using
concurrent.futures.ProcessPoolExecutor
withasyncio.to_thread()
for them - Keep synchronization in mind - use
asyncio.Lock()
,asyncio.Semaphore()
, and other primitives
Advanced Migration Scenarios 🔗
Beyond the basic patterns shown above, here are some additional migration scenarios you might encounter:
Handling Monkey Patching
Eventlet:
import eventlet
eventlet.monkey_patch()
import socket
import time
def blocking_operation():
# This socket operation is patched to be non-blocking
sock = socket.socket()
sock.connect(('example.com', 80))
# This sleep is patched to yield to other green threads
time.sleep(1)
AsyncIO:
import asyncio
import socket
async def non_blocking_operation():
# Use AsyncIO's explicit non-blocking operations
reader, writer = await asyncio.open_connection('example.com', 80)
# Explicitly yield control with await
await asyncio.sleep(1)
# When dealing with libraries that don't support AsyncIO:
# Use executor for blocking operations
result = await asyncio.to_thread(some_blocking_function)
Replace implicit monkey patching with explicit async calls. Use asyncio.to_thread()
for unavoidable blocking operations.
Timeouts and Exception Handling
Eventlet:
import eventlet
def with_timeout():
try:
with eventlet.Timeout(5):
# Operation that should timeout
eventlet.sleep(10)
print("This won't be reached")
except eventlet.Timeout:
print("Operation timed out")
AsyncIO:
import asyncio
async def with_timeout():
try:
# Operation that should timeout
await asyncio.wait_for(asyncio.sleep(10), timeout=5)
print("This won't be reached")
except asyncio.TimeoutError:
print("Operation timed out")
# Alternatively, use shield to protect from cancellation
try:
result = await asyncio.wait_for(
asyncio.shield(important_operation()),
timeout=5
)
except asyncio.TimeoutError:
print("Operation timed out but continues running")
Use asyncio.wait_for()
for timeouts and asyncio.shield()
to protect critical operations from cancellation.
Testing Async Code
Eventlet:
import unittest
import eventlet
def test_function():
eventlet.sleep(0.1)
return "done"
class TestEventlet(unittest.TestCase):
def test_async_function(self):
result = test_function()
self.assertEqual(result, "done")
AsyncIO:
import unittest
import asyncio
async def test_function():
await asyncio.sleep(0.1)
return "done"
class TestAsyncIO(unittest.TestCase):
def test_async_function(self):
result = asyncio.run(test_function())
self.assertEqual(result, "done")
# With pytest you can use:
# @pytest.mark.asyncio
# async def test_async_function(self):
# result = await test_function()
# assert result == "done"
Use asyncio.run()
in standard unittest or pytest's @pytest.mark.asyncio
decorator for async tests.
Resource Pooling
Eventlet:
import eventlet
from eventlet.db_pool import ConnectionPool
def db_connect():
# Create a database connection
return db_connection
# Create a pool of database connections
pool = ConnectionPool(db_connect, max_size=10)
def use_db():
conn = pool.get()
try:
# Use the connection
conn.execute("SELECT * FROM table")
finally:
pool.put(conn)
AsyncIO:
import asyncio
import asyncpg
async def use_db():
# Using connection pools with AsyncIO
pool = await asyncpg.create_pool(dsn="postgres://user:pass@host/db",
min_size=5,
max_size=10)
async with pool.acquire() as conn:
# Use the connection
result = await conn.fetch("SELECT * FROM table")
await pool.close() # Close the pool when done
Use async database libraries like asyncpg
, aiomysql
, or aiosqlite
that provide built-in connection pooling.
Web Framework Integration
Eventlet with Flask:
import eventlet
eventlet.monkey_patch()
from flask import Flask
app = Flask(__name__)
@app.route('/')
def index():
return "Hello, World!"
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000)
AsyncIO-based Alternative:
# Option 1: Use an async framework like FastAPI
from fastapi import FastAPI
import uvicorn
app = FastAPI()
@app.get('/')
async def index():
return {"message": "Hello, World!"}
if __name__ == '__main__':
uvicorn.run(app, host='0.0.0.0', port=8000)
# Option 2: Use ASGI adapters for existing frameworks
# For Flask:
from flask import Flask
import asyncio
from asgiref.wsgi import WsgiToAsgi
import uvicorn
app = Flask(__name__)
@app.route('/')
def index():
return "Hello, World!"
asgi_app = WsgiToAsgi(app)
if __name__ == '__main__':
uvicorn.run(asgi_app, host='0.0.0.0', port=8000)
Either migrate to native async frameworks like FastAPI, or use ASGI adapters to run traditional WSGI frameworks with async servers.
Sync-Async Adapter Pattern 🔗
The Sync-Async Adapter Pattern enables a library to support both synchronous and asynchronous execution modes, while internally using a unified implementation based on asyncio
.
This pattern is particularly suitable for refactoring existing libraries originally based on Eventlet, allowing them to remove their dependency on eventlet.monkey_patch()
. Thus, a single library can seamlessly operate both in modern asyncio
-based applications and traditional synchronous environments.
Initial Use Case (with Eventlet)
Consider a hypothetical library named hypotheticlib
that exposes a synchronous HTTP client:
# hypotheticlib.py
import requests
class Client:
headers = {
'User-Agent': 'sync-client'
}
def request(self, url):
response = requests.get(url)
return response.json()
Currently, some applications utilize Eventlet to make this library implicitly non-blocking:
import eventlet
eventlet.monkey_patch()
from hypotheticlib import Client
client = Client()
result = client.request("https://api.example.com/data") # Non-blocking due to Eventlet
This approach has several drawbacks:
- Direct dependency on Eventlet and global monkey-patching.
- Maintenance difficulties due to monkey-patching side-effects.
- No native compatibility with
asyncio
.
Refactoring with the Sync-Async Adapter Pattern
The refactoring goal is to completely replace Eventlet with a native asyncio
-based implementation while preserving an interface usable in both synchronous and asynchronous contexts.
Here's an example of how the refactored implementation would look:
import asyncio
import aiohttp
class AsyncClient:
"""Native asyncio-based implementation."""
headers = {
'User-Agent': 'async-client'
}
async def request(self, url):
async with aiohttp.ClientSession(headers=self.headers) as session:
async with session.get(url) as response:
return await response.json()
class Client(AsyncClient):
"""Adapter enabling synchronous usage of AsyncClient."""
def _run_coroutine(self, coro):
try:
loop = asyncio.get_running_loop()
if loop.is_running():
raise RuntimeError(
"Cannot run synchronous call when event loop is already running."
)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(coro)
def request(self, url):
return self._run_coroutine(super().request(url))
Managing asyncio Event Loops
The _run_coroutine()
method handles the event loop as follows:
- It first checks if an event loop (
asyncio.get_running_loop()
) is currently active. - If no loop is active, it creates and starts a new loop using
asyncio.new_event_loop()
to synchronously execute the coroutine withrun_until_complete()
. - If an active loop is detected, it raises an explicit error, as synchronous calls cannot be executed in this context without additional handling.
This approach ensures robust management of asyncio event loops.
Examples of Usage after Refactoring
Explicit asynchronous usage (asyncio
):
import asyncio
from hypotheticlib import AsyncClient
async def main():
client = AsyncClient()
result = await client.request("https://api.example.com/data")
print(result)
asyncio.run(main())
Traditional synchronous usage:
from hypotheticlib import Client
client = Client()
result = client.request("https://api.example.com/data")
print(result)
Why is this pattern effective for replacing Eventlet?
Complete removal of Eventlet monkey-patching
No more global dynamic patching, significantly reducing side-effects.
Transparent compatibility with asyncio
Enables direct integration into modern applications that follow the async/await paradigm.
Interoperability with existing code
Allows the same library to be equally usable in traditional synchronous contexts and in modern async scenarios.
Limitations and considerations
- The adapter cannot execute synchronous calls if an asyncio event loop is already running in the calling thread. In such scenarios, explicit asynchronous usage is required.
- Long-running CPU-bound operations are not recommended within this pattern, as they can block the event loop.
Last words about this pattern
This pattern is inspired by this Python bug discussion and is inspired from the edgedb-python (now named gel-python) library. Speaking examples of edgedb-python usages are available here.
The Sync-Async Adapter Pattern simplifies refactoring Eventlet-dependent libraries towards a native asyncio
architecture while maintaining backward compatibility with traditional synchronous use-cases. This approach enables smooth migration towards modern async environments, simplifies maintenance, and increases overall application robustness.
AsyncIO Libraries 🔗
There is plenty of AsyncIO based libraries. We invite to have a look to this curated list of AsyncIO libraries.
Common Pitfalls 🔗
Be aware of the following common pitfalls during the migration:
Blocking calls within async functions
Blocking calls can halt the execution of your async code, leading to performance issues. Ensure that all I/O operations are non-blocking and use appropriate AsyncIO methods.
Improper use of async/await syntax
Incorrect usage of async/await can lead to unexpected behavior and bugs. Familiarize yourself with the correct patterns and practices for using async/await in Python.
Concurrency issues and race conditions
Concurrency issues and race conditions can arise when multiple tasks access shared resources. Use synchronization primitives like locks, semaphores, and queues to manage access to shared resources.
Performance bottlenecks due to improper use of AsyncIO features
Improper use of AsyncIO features can lead to performance bottlenecks. Profile your code to identify slow parts and optimize them by using efficient AsyncIO patterns and techniques.
Conclusion
Migrating to AsyncIO can significantly improve the performance and scalability of your application. Follow the steps outlined in this guide and be mindful of common pitfalls to ensure a smooth migration process.