Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/alex-ber/AlexBerUtils/llms.txt

Use this file to discover all available pages before exploring further.

This module provides utilities for running functions in thread-pool (or other) executors while preserving contextvars.ContextVar state, bridging between sync and async contexts, and detecting unhandled exceptions in fire-and-forget tasks.
Call initConfig() from the main thread inside a running event loop before using lift_to_async, exec_in_executor, or get_main_event_loop. Without it, cross-thread event-loop dispatch will not work.
import asyncio
from alexber.utils.thread_locals import initConfig

async def startup():
    initConfig()           # call with no arguments is fine
    # optionally pass a custom executor:
    # initConfig(executor=my_executor)

asyncio.run(startup())

exec_in_executor

Runs a synchronous or asynchronous function in an executor, preserving the current ContextVar snapshot, and returns an asyncio.Future.
import asyncio
from concurrent.futures import ThreadPoolExecutor
from alexber.utils.thread_locals import exec_in_executor

executor = ThreadPoolExecutor(max_workers=4)

async def main():
    future = exec_in_executor(executor, my_blocking_function, arg1, arg2)
    result = await future

asyncio.run(main())
The executor is resolved in order:
  1. The executor argument if not None.
  2. The executor passed to initConfig(executor=...) if set.
  3. None — asyncio’s default executor.
executor
Optional[Executor]
required
The concurrent.futures.Executor to run the function in. Pass None to use the configured global or asyncio default.
func
Callable[..., T]
required
The function or coroutine function to execute. Both sync and async def functions are accepted.
*args
Positional arguments forwarded to func.
**kwargs
Keyword arguments forwarded to func.
Returns asyncio.Future[T] — wrapped in a FutureWrapper that logs unconsumed exceptions automatically.
Each thread in the executor receives an event loop via ensure_thread_event_loop(), so async functions submitted to the executor can themselves schedule coroutines.

exec_in_executor_threading_future

Same semantics as exec_in_executor, but returns a concurrent.futures.Future (a threading.Future) instead of an asyncio.Future. Useful when you need to block a non-async thread waiting for a result.
from alexber.utils.thread_locals import exec_in_executor_threading_future

future = exec_in_executor_threading_future(executor, my_function, arg1)
result = future.result()   # blocks the calling thread
Parameters are identical to exec_in_executor.

AsyncExecutionQueue

An async context manager that manages a background worker task. Tasks submitted to the queue are executed in order through exec_in_executor, preserving ContextVar state at the time of submission.
import asyncio
from alexber.utils.thread_locals import AsyncExecutionQueue

async def main():
    async with AsyncExecutionQueue(executor=None) as queue:
        fut1 = await queue.aadd_task(my_async_func, arg1)
        fut2 = await queue.aadd_task(my_sync_func, arg2)

        result1 = await fut1
        result2 = await fut2

asyncio.run(main())

Constructor parameters

queue
asyncio.Queue
Custom queue to use. A new asyncio.Queue() is created if omitted.
executor
Executor
Executor to run tasks. Falls back to the global executor or asyncio default.

aadd_task(func, /, *args, **kwargs) -> asyncio.Future

Asynchronously enqueues a task. The current ContextVar context is captured at submission time and restored when the task runs. Returns an asyncio.Future that resolves to the function’s return value.
async with AsyncExecutionQueue() as queue:
    future = await queue.aadd_task(compute, x=10)
    print(await future)

add_task(executor, func, /, *args, **kwargs) -> threading.Future

Synchronous counterpart to aadd_task. Can be called from a non-async thread. Returns a concurrent.futures.Future.
# From a worker thread (no event loop):
future = queue.add_task(executor, compute, x=10)
result = future.result()   # blocks until done

aclose()

Cancels all pending tasks, signals the worker to exit, and waits for it to finish. Called automatically by __aexit__.
await queue.aclose()

lift_to_async

Calls an async function from synchronous code, preserving ContextVar state. Dispatches to the event loop captured by initConfig().
from alexber.utils.thread_locals import lift_to_async

# Inside a sync function running on a non-main thread:
result = lift_to_async(my_async_function, arg1, arg2, kwarg=value)
afunc
Callable[..., Awaitable[T]]
required
The async function to call. Passed as a positional-only argument.
*args
Positional arguments forwarded to afunc.
**kwargs
Keyword arguments forwarded to afunc.
Returns the result of the awaited coroutine.
lift_to_async must not be called from the main thread when an event loop is already running — doing so will deadlock. Call it from a worker thread (e.g., one spawned by asyncio.to_thread).

ensure_thread_event_loop

Initialises an event loop for the current thread if one does not already exist, and stores it in thread-local storage.
from alexber.utils.thread_locals import ensure_thread_event_loop

def thread_entry():
    ensure_thread_event_loop()
    # thread now has an event loop available
This is called internally by exec_in_executor for each worker thread, so you typically do not need to call it directly.

FutureWrapper

A lightweight proxy around an asyncio.Future that tracks whether its result was ever consumed (via await, .result(), or .exception()). If the future completes with an exception and nobody consumed it, the exception is logged automatically.
from alexber.utils.thread_locals import FutureWrapper
import asyncio

async def demo():
    loop = asyncio.get_running_loop()
    raw_future = loop.create_future()
    wrapper = FutureWrapper(raw_future)

    raw_future.set_exception(ValueError("oops"))
    # If wrapper is never awaited, the exception will be logged

asyncio.run(demo())
Exposed attributes:
MemberDescription
__await__Marks as consumed and delegates to the underlying future.
result()Marks as consumed and returns the result.
exception()Marks as consumed and returns the exception.
add_done_callback(cb)Delegates to the underlying future.
done()Delegates to the underlying future.
_consumedboolTrue once the result has been accessed.

chain_future_results

Propagates the result or exception from a source future to a target future. Typically used as a done callback.
from alexber.utils.thread_locals import chain_future_results
import asyncio, concurrent.futures

async def demo():
    loop = asyncio.get_running_loop()
    source = loop.create_future()
    target = concurrent.futures.Future()

    source.add_done_callback(
        lambda fut: chain_future_results(fut, target)
    )

    source.set_result(42)
    print(target.result())  # 42

asyncio.run(demo())
source_future
Future
required
The future whose outcome will be read.
target_future
Future
required
The future that will receive the result or exception. No-ops if it is already resolved.

handle_future_exception

Schedules a delayed check that logs any unconsumed exception from a future. Used internally by exec_in_executor but can be called directly for custom futures.
from alexber.utils.thread_locals import handle_future_exception
import asyncio

async def demo():
    loop = asyncio.get_running_loop()
    fut = loop.create_future()
    handle_future_exception(fut, delay=0.5)  # check after 0.5 s
    fut.set_exception(RuntimeError("unhandled"))
    await asyncio.sleep(1)  # exception is logged here

asyncio.run(demo())
future
asyncio.Future
required
The future to monitor.
delay
float
default:"0"
Seconds to wait before checking. Defaults to 0 (next iteration of the event loop).