Managing Migration Depth
In large applications, Eventlet dependencies can become deeply embedded throughout the codebase, making a complete migration to AsyncIO challenging. This chapter explores strategies to manage migration depth, with a special focus on using Awaitlet to limit the impact of your refactoring efforts.
Understanding Migration Depth 🔗
When migrating from Eventlet to AsyncIO, one of the most significant challenges is dealing with the propagation of async/await keywords throughout your codebase. This is what we call the "migration depth" problem.
The Async Propagation Challenge
In AsyncIO, when a function uses await
, it must be defined with async def
. Any function that calls an async function must also be async, creating a chain reaction throughout your codebase.
# The await keyword propagates up the call stack
async def low_level_io():
# Async I/O operation
return await asyncio.sleep(1)
async def mid_level_function():
# Must be async to call low_level_io
return await low_level_io()
async def high_level_function():
# Must be async to call mid_level_function
return await mid_level_function()
# Your entire call stack becomes async
For large applications with deeply nested call hierarchies, this can lead to:
- Extensive refactoring: Changes ripple throughout your codebase, affecting far more code than you initially intended.
- Complex transitions: APIs that span multiple modules or services become difficult to migrate incrementally.
- Mixed compatibility issues: During migration, maintaining compatibility between async and sync code becomes challenging.
(e.g., network I/O)"] G["Forces changes to propagate
up the entire call stack"] end E -.-> F F -.-> G
This is where the Awaitlet library comes in, offering a solution to contain the async propagation and manage the depth of your migration more effectively.
Awaitlet as a Migration Tool 🔗
Awaitlet is a library that helps bridge the gap between synchronous and asynchronous code. Its primary purpose is to limit the "async infection" that naturally occurs when migrating to AsyncIO.
What Awaitlet Does
Awaitlet allows you to call asynchronous functions from synchronous code without having to make the calling function async. It acts as a containment mechanism for async/await propagation.
import awaitlet
# A regular synchronous function
def sync_function():
# Call an async function without making sync_function async
result = awaitlet.awaitlet(async_function())
return result
# An async function that can be called from sync code
async def async_function():
await asyncio.sleep(1)
return "result"
Key Benefits for Eventlet Migration
Incremental Migration
Migrate your codebase piece by piece without having to refactor everything at once.
Controlled Refactoring
Limit the scope of async changes to only the parts that truly benefit from async I/O.
API Compatibility
Maintain backward compatibility with existing synchronous APIs while introducing async functionality.
Awaitlet essentially provides containment for your AsyncIO changes, allowing you to create "async islands" within your otherwise synchronous codebase.
with await_] E[Async Low-level I/O] end C --> D D --> E style D fill:#059669,stroke:#10b981 style E fill:#059669,stroke:#10b981
Implementing Awaitlet in Your Migration 🔗
Let's explore how to implement Awaitlet in your Eventlet migration project with practical examples.
Installation and Setup
pip install awaitlet
Basic Usage Pattern
import awaitlet
import asyncio
# The async function you want to call
async def fetch_data_async(url):
# Some async operation that replaces an Eventlet call
await asyncio.sleep(1) # Simulate network I/O
return f"Data from {url}"
# Your existing synchronous function
def process_data(url):
# Instead of using eventlet.spawn, use awaitlet
data = awaitlet.awaitlet(fetch_data_async(url))
return data
# Usage remains unchanged
result = process_data("https://example.com")
Migration Example: From Eventlet to AsyncIO with Awaitlet
Original Eventlet Code
import eventlet
from eventlet.green import urllib2
def fetch_data(url):
return urllib2.urlopen(url).read()
def process_urls(urls):
pool = eventlet.GreenPool()
results = []
for body in pool.imap(fetch_data, urls):
results.append(body)
return results
# Usage
urls = ['http://example.com', 'http://example.org']
results = process_urls(urls)
Migrated with Awaitlet
import asyncio
import aiohttp
import awaitlet
async def fetch_data_async(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.read()
def fetch_data(url):
# Individual calls use awaitlet to bridge sync and async
return awaitlet.awaitlet(fetch_data_async(url))
def process_urls(urls):
# Function remains synchronous
results = []
for url in urls:
results.append(fetch_data(url))
return results
# Usage remains unchanged
urls = ['http://example.com', 'http://example.org']
results = process_urls(urls)
This approach allows you to introduce AsyncIO at the lower levels where I/O operations happen while keeping your API and business logic layers unchanged.
Handling Multiple Async Calls
For concurrent operations (similar to Eventlet's GreenPool), Awaitlet can work with gathered tasks:
import awaitlet
import asyncio
import aiohttp
async def fetch_all_async(urls):
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
tasks.append(fetch_one_async(session, url))
return await asyncio.gather(*tasks)
async def fetch_one_async(session, url):
async with session.get(url) as response:
return await response.text()
def process_urls(urls):
# Single awaitlet call for multiple concurrent async operations
results = awaitlet.awaitlet(fetch_all_async(urls))
return results
# Usage remains unchanged
urls = ['http://example.com', 'http://example.org']
results = process_urls(urls)
Managing Deep Dependencies 🔗
For applications with deeply nested dependencies on Eventlet, Awaitlet enables a targeted migration strategy that minimizes refactoring impact.
The Depth Containment Strategy
1. Identify I/O Boundaries
Focus on identifying the lowest layers in your application where I/O operations occur - these are ideal points to introduce AsyncIO.
- Database access layers
- External API clients
- File system operations
- Network communication components
2. Create Async Wrappers
Create async versions of your I/O operations, but keep the original synchronous API intact by using Awaitlet.
import awaitlet
async def db_query_async(query):
# Async implementation
def db_query(query):
# Keep original API signature
return awaitlet.awaitlet(db_query_async(query))
3. Introduce Async Incrementally
Begin with the most critical I/O bottlenecks and gradually expand your AsyncIO implementation where it provides the most benefit.
import awaitlet
# Phase 1: High-impact components
def critical_operation():
# Changed to use awaitlet
return awaitlet.awaitlet(critical_async())
# Phase 2: Secondary components
def less_critical_operation():
# Still using eventlet for now
return eventlet.spawn(operation).wait()
4. Optional: Expose Async APIs
For parts of your application that would benefit from an async interface, consider exposing dual APIs:
import awaitlet
# Synchronous interface (original)
def get_user(user_id):
return awaitlet.awaitlet(get_user_async(user_id))
# Async interface (new)
async def get_user_async(user_id):
# Async implementation
Example: Refactoring a Deep Call Chain
import awaitlet
import asyncio
# Original deep call chain with Eventlet dependencies
def api_endpoint(request):
data = service_layer(request.params)
return {"result": data}
def service_layer(params):
return business_logic(params)
def business_logic(params):
return data_access_layer(params)
def data_access_layer(params):
# This uses Eventlet for I/O
import eventlet
return eventlet.spawn(fetch_from_db, params).wait()
def fetch_from_db(params):
# Simulated DB query
eventlet.sleep(0.1)
return {"id": params.get("id"), "value": "data"}
# Refactored with Awaitlet - only changing the lower layers
def api_endpoint(request):
# Unchanged
data = service_layer(request.params)
return {"result": data}
def service_layer(params):
# Unchanged
return business_logic(params)
def business_logic(params):
# Unchanged
return data_access_layer(params)
def data_access_layer(params):
# Changed to use awaitlet instead of eventlet
return awaitlet.awaitlet(fetch_from_db_async(params))
async def fetch_from_db_async(params):
# Async implementation
await asyncio.sleep(0.1)
return {"id": params.get("id"), "value": "data"}
Notice how the higher-level functions remain completely unchanged, while only the I/O layer has been modified to use AsyncIO.
Using Awaitlet with Internal Private APIs 🔗
Internal private APIs present a particular challenge during migration because they often serve as integration points between different components of your application. Awaitlet can be particularly valuable in these scenarios.
Extending the Migration Interface Pattern
Building on the migration interface pattern described earlier, Awaitlet enhances this approach:
Creating an API Gateway with Awaitlet
# api_gateway.py
import awaitlet
class InternalAPIGateway:
"""Migration interface that abstracts implementation details."""
def fetch_data(self, resource_id):
# Version 1: During migration, call the async implementation
# without requiring callers to change
return awaitlet.awaitlet(self._fetch_data_async(resource_id))
async def _fetch_data_async(self, resource_id):
# New async implementation that replaces Eventlet
# This can be gradually enhanced with AsyncIO features
import asyncio
await asyncio.sleep(0.1) # Simulated async I/O
return {"id": resource_id, "data": "result"}
# More API methods following the same pattern
def save_data(self, resource_id, data):
return awaitlet.awaitlet(self._save_data_async(resource_id, data))
async def _save_data_async(self, resource_id, data):
# Async implementation
import asyncio
await asyncio.sleep(0.1)
return {"status": "saved", "id": resource_id}
Practical Migration Strategy for Internal APIs
1. Create a Dual-Mode API Layer
The API gateway provides both sync and async interfaces:
import awaitlet
class DatabaseAPI:
# Sync interface (for backward compatibility)
def get_user(self, user_id):
return awaitlet.awaitlet(self.get_user_async(user_id))
# Async interface (for new async consumers)
async def get_user_async(self, user_id):
# Async implementation
2. Incremental Consumer Migration
Consumers can gradually migrate to the async API at their own pace:
# Legacy consumer (continues to work)
def process_user(user_id):
user = db_api.get_user(user_id)
return user
# New async consumer
async def process_user_async(user_id):
user = await db_api.get_user_async(user_id)
return user
Case Example: Migrating an Internal RPC System
import awaitlet
import asyncio
# Original implementation using Eventlet
class RPCClientOriginal:
def __init__(self):
import eventlet
self.pool = eventlet.GreenPool()
def call_remote(self, method, *args, **kwargs):
def _execute():
# Simulate RPC call with eventlet
import eventlet
eventlet.sleep(0.1)
return {'result': f"Response from {method}"}
return self.pool.spawn(_execute).wait()
# Migrated implementation with Awaitlet
class RPCClient:
def call_remote(self, method, *args, **kwargs):
# Same interface, but using awaitlet internally
return awaitlet.awaitlet(self._call_remote_async(method, *args, **kwargs))
async def _call_remote_async(self, method, *args, **kwargs):
# Async implementation
await asyncio.sleep(0.1)
return {'result': f"Response from {method}"}
# Optionally expose async API for new consumers
async def call_remote_async(self, method, *args, **kwargs):
return await self._call_remote_async(method, *args, **kwargs)
# Usage remains unchanged for existing code
client = RPCClient()
result = client.call_remote("get_data", id=123)
# New async code can use the async API
async def new_consumer():
client = RPCClient()
result = await client.call_remote_async("get_data", id=123)
return result
This approach allows you to maintain API compatibility while migrating the underlying implementation from Eventlet to AsyncIO.
Best Practices and Limitations 🔗
Best Practices
Keep awaitlet Close to I/O
Place Awaitlet calls as close as possible to the actual I/O operations to minimize performance overhead and make the code easier to understand.
import awaitlet
# Good practice
def get_user(user_id):
# awaitlet used close to the I/O operation
return awaitlet.awaitlet(db_query_async(f"SELECT * FROM users WHERE id={user_id}"))
Avoid Excessive Nesting
Instead of nesting multiple async calls within a single awaitlet call, combine them into a single async function.
import awaitlet
# Avoid this
def process():
result1 = awaitlet.awaitlet(async_operation1())
result2 = awaitlet.awaitlet(async_operation2(result1))
return result2
# Better approach
async def process_async():
result1 = await async_operation1()
result2 = await async_operation2(result1)
return result2
def process():
return awaitlet.awaitlet(process_async())
Consider Thread Safety
Awaitlet creates a new event loop in a separate thread for each call. Be mindful of thread safety when accessing shared resources.
Document Transition Points
Clearly document where sync-to-async transitions occur in your codebase to aid maintenance and future refactoring.
Limitations and Considerations
Performance Overhead
Awaitlet incurs some overhead due to thread creation and synchronization. It's most efficient for I/O-bound operations where the async benefit outweighs this cost.
Cannot Use in Async Context
The awaitlet.awaitlet()
function cannot be used inside an async function. It's designed as a bridge from sync to async, not the reverse.
Long-Term Strategy
While Awaitlet is excellent for migration, consider whether your long-term architecture should eventually move to fully async patterns for critical paths.
Case Study: Incremental Migration of a Complex Application 🔗
Let's walk through a realistic scenario of migrating a complex application with deeply nested Eventlet dependencies using Awaitlet.
Initial Architecture
A large application with multiple services and deep Eventlet usage:
- API Server with multiple endpoints
- Worker services processing jobs in the background
- Shared internal libraries used by all components
- Extensive use of Eventlet for concurrency throughout the stack
Migration Strategy with Awaitlet
Phase 1: Create Async Versions of Low-Level Libraries
import awaitlet
# Original Eventlet-based database client
class DBClient:
def query(self, sql):
import eventlet
# Eventlet-based implementation
conn = self.get_connection()
return eventlet.spawn(conn.execute, sql).wait()
# Migrated version with Awaitlet
class DBClient:
def query(self, sql):
# Same interface, but using awaitlet internally
return awaitlet.awaitlet(self._query_async(sql))
async def _query_async(self, sql):
# AsyncIO implementation using asyncpg or similar
import asyncpg
conn = await self.get_connection_async()
return await conn.fetch(sql)
# Optionally expose async API
async def query_async(self, sql):
return await self._query_async(sql)
Phase 2: Migrate Shared Services
import awaitlet
# Shared cache service with dual interface
class CacheService:
def __init__(self):
# Initialize connection pool or similar
pass
def get(self, key):
# Sync interface using Awaitlet
return awaitlet.awaitlet(self._get_async(key))
async def _get_async(self, key):
# Async implementation
import aioredis
redis = await aioredis.create_redis_pool('redis://localhost')
value = await redis.get(key)
redis.close()
await redis.wait_closed()
return value
# Similar pattern for set, delete, etc.
Phase 3: Optionally Expose Async APIs for New Code
import awaitlet
# New async-aware service that uses async APIs directly
class NewAnalyticsService:
def __init__(self, db_client, cache_service):
self.db = db_client
self.cache = cache_service
async def process_data_async(self, data_id):
# Use async APIs directly for better performance
cached_result = await self.cache._get_async(f"analytics:{data_id}")
if cached_result:
return cached_result
result = await self.db._query_async(f"SELECT * FROM analytics WHERE id={data_id}")
# Process result asynchronously
return result
Phase 4: Gradually Move to Pure AsyncIO Where Beneficial
# Example: Converting an API endpoint to fully async
from aiohttp import web
# Async API handler
async def handle_data_request(request):
data_id = request.match_info.get('id')
service = request.app['analytics_service']
# Directly use async methods
result = await service.process_data_async(data_id)
return web.json_response(result)
# Configure async web app
app = web.Application()
app.router.add_get('/data/{id}', handle_data_request)
# Initialize services
async def init_app():
db_client = DBClient()
cache_service = CacheService()
app['analytics_service'] = NewAnalyticsService(db_client, cache_service)
if __name__ == '__main__':
web.run_app(app, host='0.0.0.0', port=8080, on_startup=[init_app])
Results and Benefits
Managed Migration Depth
The use of Awaitlet contained the async changes to specific layers, preventing widespread refactoring.
Incremental Adoption
Each component could be migrated independently without breaking existing functionality.
Improved Architecture
The migration resulted in a cleaner separation of concerns and better performance for I/O-bound operations.
Conclusion 🔗
Managing migration depth is one of the most challenging aspects of moving from Eventlet to AsyncIO, especially in large, complex applications. Awaitlet provides a practical solution by:
- Containing the "async infection" to manageable areas of your codebase
- Enabling incremental migration without disrupting existing functionality
- Providing a bridge between synchronous and asynchronous code
- Supporting the migration of applications with internal private APIs
By strategically using Awaitlet, you can transform deeply embedded Eventlet dependencies into modern AsyncIO code with minimal disruption to your application architecture.
Next Steps
After successfully managing your migration depth with Awaitlet, consider these next steps:
- Complete your migration to AsyncIO for high-performance async components
- Remove remaining Eventlet dependencies with confidence
- In addtion of Awaitlet you can take a look to the Sync-Async Adapter Pattern with AsyncIO.