Eventlet Removal Logo
Eventlet Removal

Migrating to Python Native Threading

This chapter aims to guide you through the process of migrating your codebase from Eventlet to Python native threading. We will cover the necessary steps, best practices, and common pitfalls to avoid during the migration.

Overview of Threading πŸ”—

Threading is a built-in Python module that allows you to run multiple operations concurrently in the same process space. It is part of the Python standard library and provides a foundation for concurrent programming in Python without the need for monkey patching.

Threading enables your applications to perform multiple operations in parallel, especially for I/O-bound tasks like network requests, file operations, and database queries. Unlike Eventlet's green threads, Python's threading module uses actual system threads, allowing for true parallelism on multi-core systems, though still subject to the Global Interpreter Lock (GIL) for CPU-bound tasks.

The core components of the threading module include:

  • Thread: The fundamental unit of execution. You can create a thread by instantiating the Thread class and passing a function to execute.
  • Lock: A synchronization primitive that allows only one thread to execute a particular section of code at a time, preventing race conditions.
  • RLock: A reentrant lock that can be acquired multiple times by the same thread before releasing.
  • Semaphore: A counter that controls access to a shared resource by multiple threads.
  • Event: A simple mechanism for communication between threads, allowing one thread to signal an event and other threads to wait for it.
  • Condition: A more complex synchronization primitive that combines a Lock with notification.

The threading module also integrates well with higher-level abstractions in the concurrent.futures module, which provides thread pools and executors for more efficient thread management.

For more details, you can consult the official threading documentation here and the concurrent.futures documentation here.

Replacing Eventlet with Threading πŸ”—

This section provides examples of how to replace common Eventlet-specific code with Threading equivalents. For each common Eventlet usage, we'll show how to refactor the code using Python's native threading module.

WSGI Server

Eventlet:

from eventlet import wsgi, listen

def simple_app(environ, start_response):
    start_response('200 OK', [('Content-Type', 'text/plain')])
    return [b'Hello, World!\n']

wsgi.server(listen(('', 8080)), simple_app)

Threading:

from wsgiref.simple_server import make_server
import threading

def simple_app(environ, start_response):
    start_response('200 OK', [('Content-Type', 'text/plain')])
    return [b'Hello, World!\n']

def run_server():
    httpd = make_server('', 8080, simple_app)
    httpd.serve_forever()

# Start in a separate thread
server_thread = threading.Thread(target=run_server)
server_thread.daemon = True  # Allow app to exit if this is the only thread
server_thread.start()

For WSGI servers with threading, use wsgiref for development or Gunicorn with thread workers for production.

Concurrent Network Calls

Eventlet:

import eventlet
from eventlet.green import urllib2

def fetch_url(url):
    return urllib2.urlopen(url).read()

urls = ['http://example.com', 'http://example.org']
pool = eventlet.GreenPool()

for body in pool.imap(fetch_url, urls):
    print(body)

Threading:

import requests
from concurrent.futures import ThreadPoolExecutor

def fetch_url(url):
    response = requests.get(url)
    return response.text

urls = ['http://example.com', 'http://example.org']
with ThreadPoolExecutor(max_workers=10) as executor:
    future_to_url = {executor.submit(fetch_url, url): url for url in urls}
    for future in future_to_url:
        try:
            body = future.result()
            print(body)
        except Exception as exc:
            print(f'{future_to_url[future]} generated an exception: {exc}')

Use ThreadPoolExecutor to run multiple requests concurrently with thread pools.

Background Execution of Long Tasks

Eventlet:

import eventlet

def long_running_task():
    # Simulate a long-running task
    eventlet.sleep(10)
    print("Task completed")

eventlet.spawn(long_running_task)
print("Main thread continues to run")

Threading:

import threading
import time

def long_running_task():
    # Simulate a long-running task
    time.sleep(10)
    print("Task completed")

# Create and start the thread
thread = threading.Thread(target=long_running_task)
thread.daemon = True  # Optional: make thread exit when main thread exits
thread.start()
print("Main thread continues to run")

# Optional: wait for thread completion before exiting
# thread.join()

Use threading.Thread to run tasks in background threads.

Deferred Task Management

Eventlet:

from eventlet.event import Event

def deferred_task(evt):
    print("Waiting for event")
    evt.wait()
    print("Event occurred, executing task")

evt = Event()
eventlet.spawn_after(5, deferred_task, evt)
eventlet.sleep(5)
evt.send()

Threading:

import threading
import time

def deferred_task(event):
    print("Waiting for event")
    event.wait()
    print("Event occurred, executing task")

def delayed_start(delay, target, args):
    time.sleep(delay)
    thread = threading.Thread(target=target, args=args)
    thread.start()
    return thread

event = threading.Event()

# Start thread after delay
task_thread = delayed_start(5, deferred_task, (event,))

# Wait before setting event
time.sleep(5)
event.set()

Use threading.Event for synchronization between threads, and threading.Timer for delayed execution.

Thread Management

Eventlet:

import eventlet

def task1():
    while True:
        print("Task 1 running")
        eventlet.sleep(1)

def task2():
    while True:
        print("Task 2 running")
        eventlet.sleep(1)

eventlet.spawn(task1)
eventlet.spawn(task2)
eventlet.sleep(5)  # Let tasks run for a while

Threading:

import threading
import time
import signal

def task1(stop_event):
    while not stop_event.is_set():
        print("Task 1 running")
        time.sleep(1)

def task2(stop_event):
    while not stop_event.is_set():
        print("Task 2 running")
        time.sleep(1)

stop_event = threading.Event()

thread1 = threading.Thread(target=task1, args=(stop_event,))
thread2 = threading.Thread(target=task2, args=(stop_event,))

thread1.start()
thread2.start()

# Let tasks run for a while
time.sleep(5)

# Signal threads to stop
stop_event.set()

# Wait for threads to finish
thread1.join()
thread2.join()

Use a shared threading.Event to coordinate stopping multiple threads.

Socket Operations

Eventlet:

import eventlet
from eventlet.green import socket

def echo_server(port):
    server = socket.socket()
    server.bind(('0.0.0.0', port))
    server.listen(5)
    while True:
        client_socket, addr = server.accept()
        eventlet.spawn(handle_client, client_socket)

def handle_client(client_socket):
    while True:
        data = client_socket.recv(1024)
        if not data:
            break
        client_socket.sendall(data)
    client_socket.close()

echo_server(6000)

Threading:

import socket
import threading

def handle_client(client_socket):
    try:
        while True:
            data = client_socket.recv(1024)
            if not data:
                break
            client_socket.sendall(data)
    finally:
        client_socket.close()

def echo_server(port):
    server = socket.socket()
    server.bind(('0.0.0.0', port))
    server.listen(5)
    
    try:
        while True:
            client_socket, addr = server.accept()
            client_thread = threading.Thread(
                target=handle_client, 
                args=(client_socket,)
            )
            client_thread.daemon = True
            client_thread.start()
    finally:
        server.close()

# Run the server in a dedicated thread
server_thread = threading.Thread(target=echo_server, args=(6000,))
server_thread.daemon = True
server_thread.start()

# Keep main thread alive
server_thread.join()

Create a new thread for each client connection, or use a thread pool for better resource management.

Worker Pool Management

Eventlet:

import eventlet

def worker(item):
    print(f"Processing {item}")
    eventlet.sleep(1)  # Simulating work
    return item * 2

# Create a limited pool
pool = eventlet.GreenPool(size=4)

items = range(10)
for result in pool.imap(worker, items):
    print(f"Result: {result}")

Threading:

from concurrent.futures import ThreadPoolExecutor
import time

def worker(item):
    print(f"Processing {item}")
    time.sleep(1)  # Simulating work
    return item * 2

# Create a limited thread pool
with ThreadPoolExecutor(max_workers=4) as executor:
    items = range(10)
    for item, result in zip(items, executor.map(worker, items)):
        print(f"Result for {item}: {result}")
        
    # Alternative approach:
    # futures = [executor.submit(worker, item) for item in items]
    # for future in as_completed(futures):
    #     try:
    #         result = future.result()
    #         print(f"Result: {result}")
    #     except Exception as exc:
    #         print(f"Generated an exception: {exc}")

Use ThreadPoolExecutor from the concurrent.futures module to manage worker thread pools efficiently.

Integration with Other Libraries

Eventlet:

import eventlet
import eventlet.queue as queue

def producer(q):
    for i in range(10):
        q.put(i)
        eventlet.sleep(0.1)

def consumer(q):
    while True:
        item = q.get()
        print(f"Processed: {item}")
        q.task_done()

q = queue.Queue()
eventlet.spawn(producer, q)
eventlet.spawn(consumer, q)
q.join()

Threading:

import threading
import queue
import time

def producer(q, stop_event):
    for i in range(10):
        q.put(i)
        time.sleep(0.1)
    stop_event.set()  # Signal that production is complete

def consumer(q, stop_event):
    while not stop_event.is_set() or not q.empty():
        try:
            item = q.get(timeout=0.5)
            print(f"Processed: {item}")
            q.task_done()
        except queue.Empty:
            continue

q = queue.Queue()
stop_event = threading.Event()

producer_thread = threading.Thread(target=producer, args=(q, stop_event))
consumer_thread = threading.Thread(target=consumer, args=(q, stop_event))

producer_thread.start()
consumer_thread.start()

# Wait for all tasks to be processed
q.join()

# Wait for threads to finish
producer_thread.join()
consumer_thread.join()

Use Python's standard queue module with threading for thread-safe task distribution.

Tips for Refactoring

  • Use threading.Thread for basic thread creation
  • Implement proper thread synchronization using locks, events, and semaphores
  • Consider ThreadPoolExecutor for managing thread pools efficiently
  • Always handle thread join and cleanup to avoid resource leaks
  • Be aware of the Global Interpreter Lock (GIL) which can limit CPU-bound parallel performance
  • Use threading.local() for thread-local storage to avoid race conditions
  • Consider signal handling for timeouts and graceful shutdown

Key Migration Patterns πŸ”—

This section is a curated list of migration patterns that you might meet during your journey.

Removing Monkey Patching

Before (Eventlet):

import eventlet
eventlet.monkey_patch()

After (Native Threading):

# Explicit imports without altering the behavior of standard modules
import socket
import threading

Replacing Green Threads and Spawn Functions

Using spawn / spawn_after

Before (Eventlet):

import eventlet

# Asynchronous launch
eventlet.spawn(process_data, data)

# Launch with delay
eventlet.spawn_after(delay, function)

After (Threading):

import threading

# Launch a thread
threading.Thread(target=process_data, args=(data,)).start()

# Launch with a delay using Timer
threading.Timer(delay, function).start()

Refactoring Concurrency Pools and Executors

Before (Eventlet):

import eventlet

pool = eventlet.GreenPool(100)
for item in items:
    pool.spawn_n(process_item, item)
pool.waitall()

After Using ThreadPoolExecutor (Static Size):

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=100) as executor:
    futures = [executor.submit(process_item, item) for item in items]
    for future in futures:
        future.result()  # Optionally, handle exceptions

After Using ResizableThreadPool (A Dynamically Sized Thread Pool):

To replace GreenPool with a dynamically resizable thread pool, you can use a custom implementation of a thread pool that allows resizing. Below is an example of how you can achieve this using Python's threading module:

import threading
from queue import Queue
import time

class ResizableThreadPool:
    def __init__(self, initial_size):
        self.tasks = Queue()
        self.lock = threading.Lock()
        self.threads = []
        self.resize(initial_size)

    def worker(self):
        while True:
            task, args, kwargs = self.tasks.get()
            if task is None:
                break
            try:
                task(*args, **kwargs)
            finally:
                self.tasks.task_done()

    def resize(self, new_size):
        with self.lock:
            current_size = len(self.threads)
            if new_size > current_size:
                for _ in range(new_size - current_size):
                    thread = threading.Thread(target=self.worker)
                    thread.daemon = True
                    thread.start()
                    self.threads.append(thread)
            elif new_size < current_size:
                for _ in range(current_size - new_size):
                    self.tasks.put((None, None, None))
                self.threads = self.threads[:new_size]

    def spawn(self, task, *args, **kwargs):
        self.tasks.put((task, args, kwargs))

    def join(self):
        self.tasks.join()

# Example usage
def example_task(duration):
    print(f"Task started, sleeping for {duration} seconds")
    time.sleep(duration)
    print("Task completed")

# Initialize the pool with 2 threads
pool = ResizableThreadPool(initial_size=2)

# Spawn tasks
for i in range(5):
    pool.spawn(example_task, i)

# Resize the pool to 4 threads
pool.resize(4)

# Wait for all tasks to complete
pool.join()

In this example, ResizableThreadPool allows you to dynamically resize the pool by adding or removing worker threads. You can use the spawn method to add tasks to the pool and the resize method to change the number of threads.

Please find out this living example of the implementation of this pattern.

Timeout Management

Before (Eventlet):

import eventlet

try:
    with eventlet.Timeout(5):
        do_something()
except eventlet.Timeout:
    handle_timeout()

After (Using signal or join with timeout):

Example using the signal module:

import signal

def handler(signum, frame):
    raise TimeoutError("Operation timed out")

signal.signal(signal.SIGALRM, handler)
signal.alarm(5)  # Set a timeout of 5 seconds

try:
    do_something()
except TimeoutError:
    handle_timeout()
finally:
    signal.alarm(0)  # Cancel the timeout

Example using a thread with join:

thread = threading.Thread(target=function)
thread.start()
thread.join(timeout=timeout_value)

if thread.is_alive():
    LOG.warning("Action timed out, waiting for the thread to finish...")
    thread.join()  # Extended waiting if necessary

Adjusting Sleep Calls

Before (Eventlet):

import eventlet
eventlet.sleep(0.5)

After (Using time.sleep):

import time
time.sleep(0.5)

Replacing Semaphores and Events

Before (Eventlet):

from eventlet import semaphore, event

lock = semaphore.Semaphore()
flag = event.Event()

After (Threading):

import threading

lock = threading.Semaphore()
flag = threading.Event()

Cleaning Up Imports and Reducing Eventlet Dependencies

Before (Eventlet):

import eventlet
import eventlet.wsgi

After (Using a standard WSGI server):

import socket
from wsgiref.simple_server import make_server

Refactoring Asynchronous I/O Wrappers

Before (Eventlet):

from eventlet.green import socket

s = socket.socket()
s.connect(('example.com', 80))

After (Using a server or standard I/O approach):

from wsgiref.simple_server import make_server

server = make_server('0.0.0.0', 8080, app)
server.serve_forever()

Explicit Thread Termination

Unlike Eventletβ€”which provides methods like kill() or wait()β€”with threading it is recommended to use join() to wait for a thread to finish.

Example:

thread = threading.Thread(target=function)
thread.start()
thread.join()  # Wait for the thread to finish

Threading Libraries πŸ”—

Beyond Python's standard library threading modules, several third-party libraries can enhance your threading capabilities. These libraries provide higher-level abstractions, additional functionalities, and improved performance for specific use cases.

joblib

A lightweight library providing simple tools for parallel execution. It's particularly useful for CPU-bound operations that need to bypass the GIL limitations.

Documentation

Example:

from joblib import Parallel, delayed

def process_item(item):
    return item * 2

results = Parallel(n_jobs=4)(
    delayed(process_item)(i) for i in range(10)
)

Twisted

An event-driven networking engine that can work alongside threading code for network applications. It's particularly useful for servers handling many concurrent connections.

Documentation

Key features:

  • Event-driven architecture
  • Support for many network protocols
  • Integration with threading for mixed concurrency models

SQLAlchemy

While primarily an ORM, SQLAlchemy provides excellent thread-safety features for database operations, including connection pooling designed for threaded applications.

Documentation

Thread-safe features:

  • Thread-local session management
  • Connection pooling optimized for threading
  • Thread-safe query execution

RQ (Redis Queue)

A simple Python library for queueing jobs and processing them in the background with workers. Great for distributing work across threads or processes.

Documentation

Example:

from redis import Redis
from rq import Queue

q = Queue(connection=Redis())
job = q.enqueue(long_running_function, arg1, arg2)
# Job runs in a worker thread or process

When selecting a threading library, consider your specific requirements, such as the nature of your workloads (I/O-bound vs. CPU-bound), the level of concurrency needed, and any specific features like timeout handling or thread-safe data structures that your application requires.

Remember that for CPU-bound tasks, Python's multiprocessing module might be more appropriate due to GIL limitations. For mixed workloads, combining threading with multiprocessing can provide optimal performance.

Common Pitfalls πŸ”—

Be aware of the following common pitfalls during the migration from Eventlet to native threading:

Race conditions and thread safety

Unlike Eventlet's cooperative multitasking where tasks yield control explicitly, native threads can be preempted at any time. Always use proper synchronization mechanisms (locks, semaphores, events) when accessing shared resources to prevent race conditions.

Deadlocks and thread starvation

Improper use of locks can lead to deadlocks where threads wait indefinitely for resources held by other threads. Always acquire locks in a consistent order and use timeouts when appropriate to prevent deadlocks. Also consider using threading.RLock for recursive lock acquisition in the same thread.

Thread lifecycle management

Unlike Eventlet green threads, native threads cannot be easily terminated from outside. Design your threads with proper termination signals (using threading.Event objects) and ensure all threads are joined before application exit to prevent resource leaks and orphaned threads.

Global Interpreter Lock (GIL) limitations

The Python GIL prevents multiple threads from executing Python bytecodes simultaneously, limiting performance gains for CPU-bound tasks. Threading works best for I/O-bound operations. For CPU-intensive tasks, consider using multiprocessing to achieve true parallelism.

Debugging difficulties

Multithreaded code can be notoriously difficult to debug due to its non-deterministic nature. Use thread names, comprehensive logging, and thread-local storage to make debugging easier. Consider specialized tools like faulthandler for tracking thread crashes.

Overuse of thread synchronization

Excessive use of locks and other synchronization primitives can lead to contention and performance degradation. Design your code to minimize shared state, and consider using thread-safe collections from queue or concurrent.futures modules instead of manually synchronizing access to standard collections.

Best Practices and Considerations πŸ”—

  • Active Thread Monitoring:
    Implement proactive logging to detect and manage threads that exceed expected runtimes.
    if thread.is_alive():
        LOG.warning("The action thread has exceeded the expected runtime.")
  • Explicit Executor Management:
    When using ThreadPoolExecutor, ensure you handle exceptions appropriately and monitor active thread statuses.
  • Anticipate Non-Termination of Threads:
    Native threads cannot be arbitrarily terminated; thus, design your application logic to gracefully handle thread shutdowns.

Conclusion πŸ”—

Migrating from Eventlet to native threading offers several key benefits:

  • Clarity and Transparency:
    Eliminating monkey patching ensures predictable behavior of standard modules.
  • Compatibility and Maintainability:
    Leveraging Python's standard tools (like threading and concurrent.futures) simplifies integration with other libraries and supports long-term maintenance.
  • Explicit Concurrency Control:
    Shifting to explicit threads allows for better tracking and fine-grained management of asynchronous tasks.

Whether applied in the specific context of OpenStack or in broader applications, adopting the practices presented in this chapter ensures a smooth transition to modern, robust threading mechanisms.