Eventlet Removal Logo
Eventlet Removal

Migrating to AsyncIO

This chapter aims to guide you through the process of migrating your codebase from Eventlet to AsyncIO. We will cover the necessary steps, best practices, and common pitfalls to avoid during the migration.

Overview of AsyncIO

AsyncIO is a library to write concurrent code using the async/await syntax. It is part of the Python standard library and provides a foundation for asynchronous programming in Python.

AsyncIO allows you to write programs that can handle many tasks at once, such as network requests, file I/O, and other I/O-bound operations, without blocking the main thread. This is achieved through the use of coroutines, which are special functions that can pause and resume their execution, allowing other coroutines to run in the meantime.

The core components of AsyncIO include:

  • Event Loop: The central component that manages the execution of asynchronous tasks. It runs in a single thread and schedules the execution of coroutines, callbacks, and other I/O operations.
  • Coroutines: Functions defined with the async def syntax. They use the await keyword to pause their execution and yield control back to the event loop.
  • Tasks: Wrappers around coroutines that allow them to be scheduled and managed by the event loop. They can be created using the asyncio.create_task() function.
  • Futures: Objects that represent the result of an asynchronous operation. They can be awaited to retrieve the result once the operation is complete.

AsyncIO also provides various utilities for managing asynchronous tasks, such as synchronization primitives (e.g., locks, semaphores), queues, and streams for handling network I/O.

For more details, you can consult the official AsyncIO documentation here.

Steps to Migrate

The following steps outline the process of migrating from Eventlet to AsyncIO:

  1. Activate the AsyncIO hub in Eventlet: Begin by switching to Eventlet's AsyncIO hub to enable compatibility with AsyncIO code.
    import eventlet
    eventlet.use_hub('asyncio')
    # If you use monkey patching, place it after hub activation
    eventlet.monkey_patch()

    This critical first step allows you to gradually introduce AsyncIO code while maintaining compatibility with existing Eventlet code. For more details, refer to the official Eventlet documentation.

  2. Identify Eventlet usages in your codebase.
  3. Replace Eventlet-specific code with AsyncIO equivalents.
  4. Refactor synchronous code to use async/await syntax.
  5. Test the migrated code thoroughly to ensure functionality.
  6. Optimize performance and handle edge cases.

Replacing Eventlet with AsyncIO

This section provides examples of how to replace common Eventlet-specific code with AsyncIO equivalents. For each common Eventlet usage, we'll show how to refactor the code using AsyncIO.

WSGI Server

Eventlet:

from eventlet import wsgi, listen

def simple_app(environ, start_response):
    start_response('200 OK', [('Content-Type', 'text/plain')])
    return [b'Hello, World!\n']

wsgi.server(listen(('', 8080)), simple_app)

AsyncIO:

from aiohttp import web

async def simple_app(request):
    return web.Response(text="Hello, World!")

app = web.Application()
app.add_routes([web.get('/', simple_app)])

if __name__ == '__main__':
    web.run_app(app, port=8080)

For WSGI servers, use aiohttp or FastAPI with uvicorn as an ASGI server.

Asynchronous Network Calls

Eventlet:

import eventlet
from eventlet.green import urllib2

def fetch_url(url):
    return urllib2.urlopen(url).read()

urls = ['http://example.com', 'http://example.org']
pool = eventlet.GreenPool()

for body in pool.imap(fetch_url, urls):
    print(body)

AsyncIO:

import aiohttp
import asyncio

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = ['http://example.com', 'http://example.org']
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        for body in await asyncio.gather(*tasks):
            print(body)

asyncio.run(main())

For HTTP requests, use aiohttp.ClientSession with asyncio.gather to run multiple requests concurrently.

Background Execution of Long Tasks

Eventlet:

import eventlet

def long_running_task():
    # Simulate a long-running task
    eventlet.sleep(10)
    print("Task completed")

eventlet.spawn(long_running_task)
print("Main thread continues to run")

AsyncIO:

import asyncio

async def long_running_task():
    # Simulate a long-running task
    await asyncio.sleep(10)
    print("Task completed")

async def main():
    # Create a task that runs in the background
    task = asyncio.create_task(long_running_task())
    print("Main thread continues to run")
    
    # Wait for the task to complete
    await task

asyncio.run(main())

Use asyncio.create_task() to run coroutines concurrently in the background.

Deferred Task Management

Eventlet:

from eventlet.event import Event

def deferred_task(evt):
    print("Waiting for event")
    evt.wait()
    print("Event occurred, executing task")

evt = Event()
eventlet.spawn_after(5, deferred_task, evt)
eventlet.sleep(5)
evt.send()

AsyncIO:

import asyncio

async def deferred_task(event):
    print("Waiting for event")
    await event.wait()
    print("Event occurred, executing task")

async def main():
    event = asyncio.Event()
    
    # Schedule the task
    task = asyncio.create_task(deferred_task(event))
    
    # Wait a bit then set the event
    await asyncio.sleep(5)
    event.set()
    
    await task

asyncio.run(main())

Use asyncio.Event() for synchronization between coroutines.

Green Thread Management

Eventlet:

import eventlet

def task1():
    while True:
        print("Task 1 running")
        eventlet.sleep(1)

def task2():
    while True:
        print("Task 2 running")
        eventlet.sleep(1)

eventlet.spawn(task1)
eventlet.spawn(task2)
eventlet.sleep(5)  # Let tasks run for a while

AsyncIO:

import asyncio

async def task1():
    while True:
        print("Task 1 running")
        await asyncio.sleep(1)

async def task2():
    while True:
        print("Task 2 running")
        await asyncio.sleep(1)

async def main():
    # Create tasks
    task_1 = asyncio.create_task(task1())
    task_2 = asyncio.create_task(task2())
    
    # Run for a limited time
    await asyncio.sleep(5)
    
    # Cancel tasks
    task_1.cancel()
    task_2.cancel()
    
    try:
        await asyncio.gather(task_1, task_2)
    except asyncio.CancelledError:
        pass

asyncio.run(main())

Use asyncio.create_task() to run multiple coroutines concurrently within a single thread.

Socket Compatibility

Eventlet:

import eventlet
from eventlet.green import socket

def echo_server(port):
    server = socket.socket()
    server.bind(('0.0.0.0', port))
    server.listen(5)
    while True:
        client_socket, addr = server.accept()
        eventlet.spawn(handle_client, client_socket)

def handle_client(client_socket):
    while True:
        data = client_socket.recv(1024)
        if not data:
            break
        client_socket.sendall(data)
    client_socket.close()

echo_server(6000)

AsyncIO:

import asyncio

async def handle_client(reader, writer):
    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)
        await writer.drain()
    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(
        handle_client, '0.0.0.0', 6000
    )
    
    async with server:
        await server.serve_forever()

asyncio.run(main())

Use asyncio.start_server() for TCP servers and asyncio.open_connection() for TCP clients.

WebSocket Support

Eventlet:

import eventlet
from eventlet import websocket

@websocket.WebSocketWSGI
def echo(ws):
    while True:
        msg = ws.wait()
        if msg is None:
            break
        ws.send(msg)

server = eventlet.listen(('0.0.0.0', 7000))
eventlet.wsgi.server(server, echo)

AsyncIO:

import asyncio
import websockets

async def echo(websocket, path):
    async for message in websocket:
        await websocket.send(message)

async def main():
    async with websockets.serve(echo, "0.0.0.0", 7000):
        await asyncio.Future()  # Run forever

asyncio.run(main())

Use websockets library for WebSocket communication.

Integration with Other Libraries

Eventlet:

import eventlet
from eventlet.green import threading
import queue

def worker():
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Processing {item}")
        q.task_done()

q = queue.Queue()
threads = []
for i in range(3):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

for item in range(10):
    q.put(item)

q.join()

for i in range(3):
    q.put(None)
for t in threads:
    t.join()

AsyncIO:

import asyncio

async def worker(name, queue):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        print(f"Worker {name} processing {item}")
        await asyncio.sleep(0.1)  # Simulate processing time
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    
    # Create worker tasks
    workers = [
        asyncio.create_task(worker(f"worker-{i}", queue))
        for i in range(3)
    ]
    
    # Add items to the queue
    for item in range(10):
        await queue.put(item)
    
    # Wait until all items are processed
    await queue.join()
    
    # Send shutdown signal to workers
    for _ in workers:
        await queue.put(None)
    
    # Wait for workers to finish
    await asyncio.gather(*workers)

asyncio.run(main())

Use asyncio.Queue for task distribution among coroutines.

Tips for Refactoring

  • Start by identifying pure I/O operations that can be made asynchronous
  • Refactor functions into coroutines by adding async to function definitions and await to calls that need to wait
  • Use asyncio.gather() to run multiple coroutines concurrently
  • Be careful with CPU-bound tasks - consider using concurrent.futures.ProcessPoolExecutor with asyncio.to_thread() for them
  • Keep synchronization in mind - use asyncio.Lock(), asyncio.Semaphore(), and other primitives

Advanced Migration Scenarios 🔗

Beyond the basic patterns shown above, here are some additional migration scenarios you might encounter:

Handling Monkey Patching

Eventlet:

import eventlet
eventlet.monkey_patch()

import socket
import time

def blocking_operation():
    # This socket operation is patched to be non-blocking
    sock = socket.socket()
    sock.connect(('example.com', 80))
    # This sleep is patched to yield to other green threads
    time.sleep(1)  

AsyncIO:

import asyncio
import socket

async def non_blocking_operation():
    # Use AsyncIO's explicit non-blocking operations
    reader, writer = await asyncio.open_connection('example.com', 80)
    # Explicitly yield control with await
    await asyncio.sleep(1)
    
    # When dealing with libraries that don't support AsyncIO:
    # Use executor for blocking operations
    result = await asyncio.to_thread(some_blocking_function)

Replace implicit monkey patching with explicit async calls. Use asyncio.to_thread() for unavoidable blocking operations.

Timeouts and Exception Handling

Eventlet:

import eventlet

def with_timeout():
    try:
        with eventlet.Timeout(5):
            # Operation that should timeout
            eventlet.sleep(10)
        print("This won't be reached")
    except eventlet.Timeout:
        print("Operation timed out")

AsyncIO:

import asyncio

async def with_timeout():
    try:
        # Operation that should timeout
        await asyncio.wait_for(asyncio.sleep(10), timeout=5)
        print("This won't be reached")
    except asyncio.TimeoutError:
        print("Operation timed out")
        
    # Alternatively, use shield to protect from cancellation
    try:
        result = await asyncio.wait_for(
            asyncio.shield(important_operation()), 
            timeout=5
        )
    except asyncio.TimeoutError:
        print("Operation timed out but continues running")

Use asyncio.wait_for() for timeouts and asyncio.shield() to protect critical operations from cancellation.

Testing Async Code

Eventlet:

import unittest
import eventlet

def test_function():
    eventlet.sleep(0.1)
    return "done"

class TestEventlet(unittest.TestCase):
    def test_async_function(self):
        result = test_function()
        self.assertEqual(result, "done")

AsyncIO:

import unittest
import asyncio

async def test_function():
    await asyncio.sleep(0.1)
    return "done"

class TestAsyncIO(unittest.TestCase):
    def test_async_function(self):
        result = asyncio.run(test_function())
        self.assertEqual(result, "done")
        
    # With pytest you can use:
    # @pytest.mark.asyncio
    # async def test_async_function(self):
    #     result = await test_function()
    #     assert result == "done"

Use asyncio.run() in standard unittest or pytest's @pytest.mark.asyncio decorator for async tests.

Resource Pooling

Eventlet:

import eventlet
from eventlet.db_pool import ConnectionPool

def db_connect():
    # Create a database connection
    return db_connection

# Create a pool of database connections
pool = ConnectionPool(db_connect, max_size=10)

def use_db():
    conn = pool.get()
    try:
        # Use the connection
        conn.execute("SELECT * FROM table")
    finally:
        pool.put(conn)

AsyncIO:

import asyncio
import asyncpg

async def use_db():
    # Using connection pools with AsyncIO
    pool = await asyncpg.create_pool(dsn="postgres://user:pass@host/db", 
                                     min_size=5,
                                     max_size=10)
    async with pool.acquire() as conn:
        # Use the connection
        result = await conn.fetch("SELECT * FROM table")
    
    await pool.close()  # Close the pool when done

Use async database libraries like asyncpg, aiomysql, or aiosqlite that provide built-in connection pooling.

Web Framework Integration

Eventlet with Flask:

import eventlet
eventlet.monkey_patch()

from flask import Flask
app = Flask(__name__)

@app.route('/')
def index():
    return "Hello, World!"

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000)

AsyncIO-based Alternative:

# Option 1: Use an async framework like FastAPI
from fastapi import FastAPI
import uvicorn

app = FastAPI()

@app.get('/')
async def index():
    return {"message": "Hello, World!"}

if __name__ == '__main__':
    uvicorn.run(app, host='0.0.0.0', port=8000)

# Option 2: Use ASGI adapters for existing frameworks
# For Flask:
from flask import Flask
import asyncio
from asgiref.wsgi import WsgiToAsgi
import uvicorn

app = Flask(__name__)

@app.route('/')
def index():
    return "Hello, World!"

asgi_app = WsgiToAsgi(app)

if __name__ == '__main__':
    uvicorn.run(asgi_app, host='0.0.0.0', port=8000)

Either migrate to native async frameworks like FastAPI, or use ASGI adapters to run traditional WSGI frameworks with async servers.

Sync-Async Adapter Pattern 🔗

The Sync-Async Adapter Pattern enables a library to support both synchronous and asynchronous execution modes, while internally using a unified implementation based on asyncio.

This pattern is particularly suitable for refactoring existing libraries originally based on Eventlet, allowing them to remove their dependency on eventlet.monkey_patch(). Thus, a single library can seamlessly operate both in modern asyncio-based applications and traditional synchronous environments.

Initial Use Case (with Eventlet)

Consider a hypothetical library named hypotheticlib that exposes a synchronous HTTP client:

# hypotheticlib.py
import requests

class Client:
    headers = {
        'User-Agent': 'sync-client'
    }

    def request(self, url):
        response = requests.get(url)
        return response.json()

Currently, some applications utilize Eventlet to make this library implicitly non-blocking:

import eventlet
eventlet.monkey_patch()

from hypotheticlib import Client

client = Client()
result = client.request("https://api.example.com/data")  # Non-blocking due to Eventlet

This approach has several drawbacks:

  • Direct dependency on Eventlet and global monkey-patching.
  • Maintenance difficulties due to monkey-patching side-effects.
  • No native compatibility with asyncio.

Refactoring with the Sync-Async Adapter Pattern

The refactoring goal is to completely replace Eventlet with a native asyncio-based implementation while preserving an interface usable in both synchronous and asynchronous contexts.

Here's an example of how the refactored implementation would look:

import asyncio
import aiohttp

class AsyncClient:
    """Native asyncio-based implementation."""
    headers = {
        'User-Agent': 'async-client'
    }

    async def request(self, url):
        async with aiohttp.ClientSession(headers=self.headers) as session:
            async with session.get(url) as response:
                return await response.json()

class Client(AsyncClient):
    """Adapter enabling synchronous usage of AsyncClient."""

    def _run_coroutine(self, coro):
        try:
            loop = asyncio.get_running_loop()
            if loop.is_running():
                raise RuntimeError(
                    "Cannot run synchronous call when event loop is already running."
                )
        except RuntimeError:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            return loop.run_until_complete(coro)

    def request(self, url):
        return self._run_coroutine(super().request(url))

Managing asyncio Event Loops

The _run_coroutine() method handles the event loop as follows:

  • It first checks if an event loop (asyncio.get_running_loop()) is currently active.
  • If no loop is active, it creates and starts a new loop using asyncio.new_event_loop() to synchronously execute the coroutine with run_until_complete().
  • If an active loop is detected, it raises an explicit error, as synchronous calls cannot be executed in this context without additional handling.

This approach ensures robust management of asyncio event loops.

Examples of Usage after Refactoring

Explicit asynchronous usage (asyncio):

import asyncio
from hypotheticlib import AsyncClient

async def main():
    client = AsyncClient()
    result = await client.request("https://api.example.com/data")
    print(result)

asyncio.run(main())

Traditional synchronous usage:

from hypotheticlib import Client

client = Client()
result = client.request("https://api.example.com/data")
print(result)

Why is this pattern effective for replacing Eventlet?

Complete removal of Eventlet monkey-patching

No more global dynamic patching, significantly reducing side-effects.

Transparent compatibility with asyncio

Enables direct integration into modern applications that follow the async/await paradigm.

Interoperability with existing code

Allows the same library to be equally usable in traditional synchronous contexts and in modern async scenarios.

Limitations and considerations

  • The adapter cannot execute synchronous calls if an asyncio event loop is already running in the calling thread. In such scenarios, explicit asynchronous usage is required.
  • Long-running CPU-bound operations are not recommended within this pattern, as they can block the event loop.

Last words about this pattern

This pattern is inspired by this Python bug discussion and is inspired from the edgedb-python (now named gel-python) library. Speaking examples of edgedb-python usages are available here.

The Sync-Async Adapter Pattern simplifies refactoring Eventlet-dependent libraries towards a native asyncio architecture while maintaining backward compatibility with traditional synchronous use-cases. This approach enables smooth migration towards modern async environments, simplifies maintenance, and increases overall application robustness.

AsyncIO Libraries 🔗

There is plenty of AsyncIO based libraries. We invite to have a look to this curated list of AsyncIO libraries.

Common Pitfalls 🔗

Be aware of the following common pitfalls during the migration:

Blocking calls within async functions

Blocking calls can halt the execution of your async code, leading to performance issues. Ensure that all I/O operations are non-blocking and use appropriate AsyncIO methods.

Improper use of async/await syntax

Incorrect usage of async/await can lead to unexpected behavior and bugs. Familiarize yourself with the correct patterns and practices for using async/await in Python.

Concurrency issues and race conditions

Concurrency issues and race conditions can arise when multiple tasks access shared resources. Use synchronization primitives like locks, semaphores, and queues to manage access to shared resources.

Performance bottlenecks due to improper use of AsyncIO features

Improper use of AsyncIO features can lead to performance bottlenecks. Profile your code to identify slow parts and optimize them by using efficient AsyncIO patterns and techniques.

Conclusion

Migrating to AsyncIO can significantly improve the performance and scalability of your application. Follow the steps outlined in this guide and be mindful of common pitfalls to ensure a smooth migration process.