Eventlet Removal Logo
Eventlet Removal

Managing Migration Depth

In large applications, Eventlet dependencies can become deeply embedded throughout the codebase, making a complete migration to AsyncIO challenging. This chapter explores strategies to manage migration depth, with a special focus on using Awaitlet to limit the impact of your refactoring efforts.

Understanding Migration Depth 🔗

When migrating from Eventlet to AsyncIO, one of the most significant challenges is dealing with the propagation of async/await keywords throughout your codebase. This is what we call the "migration depth" problem.

The Async Propagation Challenge

In AsyncIO, when a function uses await, it must be defined with async def. Any function that calls an async function must also be async, creating a chain reaction throughout your codebase.

# The await keyword propagates up the call stack
async def low_level_io():
    # Async I/O operation
    return await asyncio.sleep(1)

async def mid_level_function():
    # Must be async to call low_level_io
    return await low_level_io()

async def high_level_function():
    # Must be async to call mid_level_function
    return await mid_level_function()

# Your entire call stack becomes async

For large applications with deeply nested call hierarchies, this can lead to:

  • Extensive refactoring: Changes ripple throughout your codebase, affecting far more code than you initially intended.
  • Complex transitions: APIs that span multiple modules or services become difficult to migrate incrementally.
  • Mixed compatibility issues: During migration, maintaining compatibility between async and sync code becomes challenging.
%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#312e81', 'primaryTextColor': '#000', 'primaryBorderColor': '#433e99', 'lineColor': '#ffffff', 'secondaryColor': '#312e81', 'tertiaryColor': '#312e81' }}}%% graph TD A[API Endpoint] --> B[Service Layer] B --> C[Business Logic] C --> D[Data Access] D --> E[Low-level I/O] style E fill:#e11d48,stroke:#f43f5e,primaryTextColor:#fff style D fill:#fb7185,stroke:#f43f5e style C fill:#fda4af,stroke:#f43f5e style B fill:#fecdd3,stroke:#f43f5e style A fill:#fff1f2,stroke:#f43f5e subgraph Propagation F["Low-level async change
(e.g., network I/O)"] G["Forces changes to propagate
up the entire call stack"] end E -.-> F F -.-> G

This is where the Awaitlet library comes in, offering a solution to contain the async propagation and manage the depth of your migration more effectively.

Awaitlet as a Migration Tool 🔗

Awaitlet is a library that helps bridge the gap between synchronous and asynchronous code. Its primary purpose is to limit the "async infection" that naturally occurs when migrating to AsyncIO.

What Awaitlet Does

Awaitlet allows you to call asynchronous functions from synchronous code without having to make the calling function async. It acts as a containment mechanism for async/await propagation.

import awaitlet

# A regular synchronous function
def sync_function():
    # Call an async function without making sync_function async
    result = awaitlet.awaitlet(async_function())
    return result

# An async function that can be called from sync code
async def async_function():
    await asyncio.sleep(1)
    return "result"

Key Benefits for Eventlet Migration

Incremental Migration

Migrate your codebase piece by piece without having to refactor everything at once.

Controlled Refactoring

Limit the scope of async changes to only the parts that truly benefit from async I/O.

API Compatibility

Maintain backward compatibility with existing synchronous APIs while introducing async functionality.

Awaitlet essentially provides containment for your AsyncIO changes, allowing you to create "async islands" within your otherwise synchronous codebase.

%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#312e81', 'primaryTextColor': '#fff', 'primaryBorderColor': '#433e99', 'lineColor': '#ffffff', 'secondaryColor': '#312e81', 'tertiaryColor': '#312e81' }}}%% graph TD A[Synchronous API Endpoint] --> B[Synchronous Service Layer] B --> C[Synchronous Business Logic] subgraph "Async Island" D[Data Access Layer
with await_] E[Async Low-level I/O] end C --> D D --> E style D fill:#059669,stroke:#10b981 style E fill:#059669,stroke:#10b981

Implementing Awaitlet in Your Migration 🔗

Let's explore how to implement Awaitlet in your Eventlet migration project with practical examples.

Installation and Setup

pip install awaitlet

Basic Usage Pattern

import awaitlet
import asyncio

# The async function you want to call
async def fetch_data_async(url):
    # Some async operation that replaces an Eventlet call
    await asyncio.sleep(1)  # Simulate network I/O
    return f"Data from {url}"

# Your existing synchronous function
def process_data(url):
    # Instead of using eventlet.spawn, use awaitlet
    data = awaitlet.awaitlet(fetch_data_async(url))
    return data

# Usage remains unchanged
result = process_data("https://example.com")

Migration Example: From Eventlet to AsyncIO with Awaitlet

Original Eventlet Code

import eventlet
from eventlet.green import urllib2

def fetch_data(url):
    return urllib2.urlopen(url).read()

def process_urls(urls):
    pool = eventlet.GreenPool()
    results = []
    for body in pool.imap(fetch_data, urls):
        results.append(body)
    return results

# Usage
urls = ['http://example.com', 'http://example.org']
results = process_urls(urls)

Migrated with Awaitlet

import asyncio
import aiohttp
import awaitlet

async def fetch_data_async(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.read()

def fetch_data(url):
    # Individual calls use awaitlet to bridge sync and async
    return awaitlet.awaitlet(fetch_data_async(url))

def process_urls(urls):
    # Function remains synchronous
    results = []
    for url in urls:
        results.append(fetch_data(url))
    return results

# Usage remains unchanged
urls = ['http://example.com', 'http://example.org']
results = process_urls(urls)

This approach allows you to introduce AsyncIO at the lower levels where I/O operations happen while keeping your API and business logic layers unchanged.

Handling Multiple Async Calls

For concurrent operations (similar to Eventlet's GreenPool), Awaitlet can work with gathered tasks:

import awaitlet
import asyncio
import aiohttp

async def fetch_all_async(urls):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(fetch_one_async(session, url))
        return await asyncio.gather(*tasks)

async def fetch_one_async(session, url):
    async with session.get(url) as response:
        return await response.text()

def process_urls(urls):
    # Single awaitlet call for multiple concurrent async operations
    results = awaitlet.awaitlet(fetch_all_async(urls))
    return results

# Usage remains unchanged
urls = ['http://example.com', 'http://example.org']
results = process_urls(urls)

Managing Deep Dependencies 🔗

For applications with deeply nested dependencies on Eventlet, Awaitlet enables a targeted migration strategy that minimizes refactoring impact.

The Depth Containment Strategy

1. Identify I/O Boundaries

Focus on identifying the lowest layers in your application where I/O operations occur - these are ideal points to introduce AsyncIO.

  • Database access layers
  • External API clients
  • File system operations
  • Network communication components

2. Create Async Wrappers

Create async versions of your I/O operations, but keep the original synchronous API intact by using Awaitlet.

import awaitlet

async def db_query_async(query):
    # Async implementation
    
def db_query(query):
    # Keep original API signature
    return awaitlet.awaitlet(db_query_async(query))

3. Introduce Async Incrementally

Begin with the most critical I/O bottlenecks and gradually expand your AsyncIO implementation where it provides the most benefit.

import awaitlet

# Phase 1: High-impact components 
def critical_operation():
    # Changed to use awaitlet
    return awaitlet.awaitlet(critical_async())

# Phase 2: Secondary components
def less_critical_operation():
    # Still using eventlet for now
    return eventlet.spawn(operation).wait()

4. Optional: Expose Async APIs

For parts of your application that would benefit from an async interface, consider exposing dual APIs:

import awaitlet

# Synchronous interface (original)
def get_user(user_id):
    return awaitlet.awaitlet(get_user_async(user_id))

# Async interface (new)
async def get_user_async(user_id):
    # Async implementation

Example: Refactoring a Deep Call Chain

import awaitlet
import asyncio

# Original deep call chain with Eventlet dependencies
def api_endpoint(request):
    data = service_layer(request.params)
    return {"result": data}

def service_layer(params):
    return business_logic(params)

def business_logic(params):
    return data_access_layer(params)

def data_access_layer(params):
    # This uses Eventlet for I/O
    import eventlet
    return eventlet.spawn(fetch_from_db, params).wait()

def fetch_from_db(params):
    # Simulated DB query
    eventlet.sleep(0.1)
    return {"id": params.get("id"), "value": "data"}

# Refactored with Awaitlet - only changing the lower layers
def api_endpoint(request):
    # Unchanged
    data = service_layer(request.params)
    return {"result": data}

def service_layer(params):
    # Unchanged
    return business_logic(params)

def business_logic(params):
    # Unchanged
    return data_access_layer(params)

def data_access_layer(params):
    # Changed to use awaitlet instead of eventlet
    return awaitlet.awaitlet(fetch_from_db_async(params))

async def fetch_from_db_async(params):
    # Async implementation
    await asyncio.sleep(0.1)
    return {"id": params.get("id"), "value": "data"}

Notice how the higher-level functions remain completely unchanged, while only the I/O layer has been modified to use AsyncIO.

Using Awaitlet with Internal Private APIs 🔗

Internal private APIs present a particular challenge during migration because they often serve as integration points between different components of your application. Awaitlet can be particularly valuable in these scenarios.

Extending the Migration Interface Pattern

Building on the migration interface pattern described earlier, Awaitlet enhances this approach:

Creating an API Gateway with Awaitlet

# api_gateway.py
import awaitlet

class InternalAPIGateway:
    """Migration interface that abstracts implementation details."""
    
    def fetch_data(self, resource_id):
        # Version 1: During migration, call the async implementation
        # without requiring callers to change
        return awaitlet.awaitlet(self._fetch_data_async(resource_id))
    
    async def _fetch_data_async(self, resource_id):
        # New async implementation that replaces Eventlet
        # This can be gradually enhanced with AsyncIO features
        import asyncio
        await asyncio.sleep(0.1)  # Simulated async I/O
        return {"id": resource_id, "data": "result"}
    
    # More API methods following the same pattern
    def save_data(self, resource_id, data):
        return awaitlet.awaitlet(self._save_data_async(resource_id, data))
        
    async def _save_data_async(self, resource_id, data):
        # Async implementation
        import asyncio
        await asyncio.sleep(0.1)
        return {"status": "saved", "id": resource_id}

Practical Migration Strategy for Internal APIs

1. Create a Dual-Mode API Layer

The API gateway provides both sync and async interfaces:

import awaitlet

class DatabaseAPI:
    # Sync interface (for backward compatibility)
    def get_user(self, user_id):
        return awaitlet.awaitlet(self.get_user_async(user_id))
    
    # Async interface (for new async consumers)
    async def get_user_async(self, user_id):
        # Async implementation

2. Incremental Consumer Migration

Consumers can gradually migrate to the async API at their own pace:

# Legacy consumer (continues to work)
def process_user(user_id):
    user = db_api.get_user(user_id)
    return user

# New async consumer
async def process_user_async(user_id):
    user = await db_api.get_user_async(user_id)
    return user

Case Example: Migrating an Internal RPC System

import awaitlet
import asyncio

# Original implementation using Eventlet
class RPCClientOriginal:
    def __init__(self):
        import eventlet
        self.pool = eventlet.GreenPool()
    
    def call_remote(self, method, *args, **kwargs):
        def _execute():
            # Simulate RPC call with eventlet
            import eventlet
            eventlet.sleep(0.1)
            return {'result': f"Response from {method}"}
        
        return self.pool.spawn(_execute).wait()

# Migrated implementation with Awaitlet
class RPCClient:
    def call_remote(self, method, *args, **kwargs):
        # Same interface, but using awaitlet internally
        return awaitlet.awaitlet(self._call_remote_async(method, *args, **kwargs))
    
    async def _call_remote_async(self, method, *args, **kwargs):
        # Async implementation
        await asyncio.sleep(0.1)
        return {'result': f"Response from {method}"}
    
    # Optionally expose async API for new consumers
    async def call_remote_async(self, method, *args, **kwargs):
        return await self._call_remote_async(method, *args, **kwargs)

# Usage remains unchanged for existing code
client = RPCClient()
result = client.call_remote("get_data", id=123)

# New async code can use the async API
async def new_consumer():
    client = RPCClient()
    result = await client.call_remote_async("get_data", id=123)
    return result

This approach allows you to maintain API compatibility while migrating the underlying implementation from Eventlet to AsyncIO.

Best Practices and Limitations 🔗

Best Practices

Keep awaitlet Close to I/O

Place Awaitlet calls as close as possible to the actual I/O operations to minimize performance overhead and make the code easier to understand.

import awaitlet

# Good practice
def get_user(user_id):
    # awaitlet used close to the I/O operation
    return awaitlet.awaitlet(db_query_async(f"SELECT * FROM users WHERE id={user_id}"))

Avoid Excessive Nesting

Instead of nesting multiple async calls within a single awaitlet call, combine them into a single async function.

import awaitlet

# Avoid this
def process():
    result1 = awaitlet.awaitlet(async_operation1())
    result2 = awaitlet.awaitlet(async_operation2(result1))
    return result2

# Better approach
async def process_async():
    result1 = await async_operation1()
    result2 = await async_operation2(result1)
    return result2

def process():
    return awaitlet.awaitlet(process_async())

Consider Thread Safety

Awaitlet creates a new event loop in a separate thread for each call. Be mindful of thread safety when accessing shared resources.

Document Transition Points

Clearly document where sync-to-async transitions occur in your codebase to aid maintenance and future refactoring.

Limitations and Considerations

Performance Overhead

Awaitlet incurs some overhead due to thread creation and synchronization. It's most efficient for I/O-bound operations where the async benefit outweighs this cost.

Cannot Use in Async Context

The awaitlet.awaitlet() function cannot be used inside an async function. It's designed as a bridge from sync to async, not the reverse.

Long-Term Strategy

While Awaitlet is excellent for migration, consider whether your long-term architecture should eventually move to fully async patterns for critical paths.

Case Study: Incremental Migration of a Complex Application 🔗

Let's walk through a realistic scenario of migrating a complex application with deeply nested Eventlet dependencies using Awaitlet.

Initial Architecture

A large application with multiple services and deep Eventlet usage:

  • API Server with multiple endpoints
  • Worker services processing jobs in the background
  • Shared internal libraries used by all components
  • Extensive use of Eventlet for concurrency throughout the stack

Migration Strategy with Awaitlet

Phase 1: Create Async Versions of Low-Level Libraries

import awaitlet

# Original Eventlet-based database client
class DBClient:
    def query(self, sql):
        import eventlet
        # Eventlet-based implementation
        conn = self.get_connection()
        return eventlet.spawn(conn.execute, sql).wait()

# Migrated version with Awaitlet
class DBClient:
    def query(self, sql):
        # Same interface, but using awaitlet internally
        return awaitlet.awaitlet(self._query_async(sql))
    
    async def _query_async(self, sql):
        # AsyncIO implementation using asyncpg or similar
        import asyncpg
        conn = await self.get_connection_async()
        return await conn.fetch(sql)
    
    # Optionally expose async API
    async def query_async(self, sql):
        return await self._query_async(sql)

Phase 2: Migrate Shared Services

import awaitlet

# Shared cache service with dual interface
class CacheService:
    def __init__(self):
        # Initialize connection pool or similar
        pass
    
    def get(self, key):
        # Sync interface using Awaitlet
        return awaitlet.awaitlet(self._get_async(key))
    
    async def _get_async(self, key):
        # Async implementation
        import aioredis
        redis = await aioredis.create_redis_pool('redis://localhost')
        value = await redis.get(key)
        redis.close()
        await redis.wait_closed()
        return value
    
    # Similar pattern for set, delete, etc.

Phase 3: Optionally Expose Async APIs for New Code

import awaitlet

# New async-aware service that uses async APIs directly
class NewAnalyticsService:
    def __init__(self, db_client, cache_service):
        self.db = db_client
        self.cache = cache_service
    
    async def process_data_async(self, data_id):
        # Use async APIs directly for better performance
        cached_result = await self.cache._get_async(f"analytics:{data_id}")
        if cached_result:
            return cached_result
            
        result = await self.db._query_async(f"SELECT * FROM analytics WHERE id={data_id}")
        # Process result asynchronously
        return result

Phase 4: Gradually Move to Pure AsyncIO Where Beneficial

# Example: Converting an API endpoint to fully async
from aiohttp import web

# Async API handler
async def handle_data_request(request):
    data_id = request.match_info.get('id')
    service = request.app['analytics_service']
    
    # Directly use async methods
    result = await service.process_data_async(data_id)
    return web.json_response(result)

# Configure async web app
app = web.Application()
app.router.add_get('/data/{id}', handle_data_request)

# Initialize services
async def init_app():
    db_client = DBClient()
    cache_service = CacheService()
    app['analytics_service'] = NewAnalyticsService(db_client, cache_service)

if __name__ == '__main__':
    web.run_app(app, host='0.0.0.0', port=8080, on_startup=[init_app])

Results and Benefits

Managed Migration Depth

The use of Awaitlet contained the async changes to specific layers, preventing widespread refactoring.

Incremental Adoption

Each component could be migrated independently without breaking existing functionality.

Improved Architecture

The migration resulted in a cleaner separation of concerns and better performance for I/O-bound operations.

Conclusion 🔗

Managing migration depth is one of the most challenging aspects of moving from Eventlet to AsyncIO, especially in large, complex applications. Awaitlet provides a practical solution by:

  • Containing the "async infection" to manageable areas of your codebase
  • Enabling incremental migration without disrupting existing functionality
  • Providing a bridge between synchronous and asynchronous code
  • Supporting the migration of applications with internal private APIs

By strategically using Awaitlet, you can transform deeply embedded Eventlet dependencies into modern AsyncIO code with minimal disruption to your application architecture.

Next Steps

After successfully managing your migration depth with Awaitlet, consider these next steps: