Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/alex-ber/AlexBerUtils/llms.txt

Use this file to discover all available pages before exploring further.

Module-level types

FutureType

A type alias for values that can be awaited or used as concurrent.futures.Future.
FutureType = Union[Awaitable[T], concurrent.futures.Future]
Used as a generic return-type annotation for functions that may return either an asyncio awaitable or a threading future.

Thread-local helpers

threadlocal_var

Retrieves or lazily initialises a named attribute on a threading.local object.
thread_locals
threading.local
required
The thread-local storage object to read from / write to.
varname
str
required
The attribute name on thread_locals.
factory
Callable
required
Zero-or-more-argument callable used to create the value when it does not exist yet.
*args
Any
Positional arguments forwarded to factory.
**kwargs
Any
Keyword arguments forwarded to factory.
Returns: The existing or newly-created attribute value (Any).
from threading import local
from alexber.utils.thread_locals import threadlocal_var

_tls = local()
conn = threadlocal_var(_tls, "db_conn", create_connection, dsn="sqlite:///:memory:")

get_threadlocal_var

Retrieves a named attribute from a threading.local object, raising if it is not yet initialised.
thread_locals
threading.local
required
The thread-local storage object.
varname
str
required
The attribute name to retrieve.
Returns: The attribute value (Any). Raises:
  • ValueError — if the attribute is None (i.e. not initialised).
conn = get_threadlocal_var(_tls, "db_conn")

del_threadlocal_var

Deletes a named attribute from a threading.local object. Silently does nothing if the attribute does not exist.
thread_locals
threading.local
required
The thread-local storage object.
varname
str
required
The attribute name to delete.
Returns: None
del_threadlocal_var(_tls, "db_conn")

Utility classes

RootMixin

A cooperative-multiple-inheritance root that terminates the super().__init__(**kwargs) delegation chain. All locking mixin classes ultimately inherit from RootMixin, ensuring that **kwargs are absorbed without being forwarded to object.__init__.
class MyProxy(LockingAccessMixin, RootMixin):
    pass

validate_param

Asserts that a required parameter value is not None.
param_value
Any
required
The value to validate.
param_name
str
required
The human-readable name used in the error message.
Returns: None Raises:
  • ValueError — if param_value is None.
validate_param(lock, "lock")   # raises if lock is None

RLock

A reentrant lock that supports both synchronous and asynchronous acquisition. Unlike threading.RLock, this implementation also exposes coroutine-based acquire/release methods and full async with context-manager support, making it safe to share across sync threads and async tasks. See the design article for details.
from alexber.utils.thread_locals import RLock

lock = RLock()

# synchronous
with lock:
    ...

# asynchronous
async with lock:
    ...

RLock.acquire

Acquires the synchronous (threading) lock, blocking until available. Reentrant — the owning thread may acquire again. Returns: True

RLock.release

Releases the synchronous lock held by the current thread. Returns: True Raises:
  • RuntimeError — if the current thread does not own the lock.

RLock.async_acquire

Coroutine that acquires the asynchronous lock, suspending the current task until available. Reentrant — the owning task may acquire again. Returns: True

RLock.async_release

Coroutine that releases the asynchronous lock held by the current task. Returns: True Raises:
  • RuntimeError — if the current asyncio task does not own the lock.

RLock.__enter__ / __exit__

Context manager protocol. __enter__ calls acquire(); __exit__ calls release().

RLock.__aenter__ / __aexit__

Async context manager protocol. __aenter__ awaits async_acquire(); __aexit__ awaits async_release().

Locking Mixin classes

All mixins accept **kwargs in __init__ and expect at minimum:
kwargTypeMeaning
objAnyThe object being wrapped/proxied
lockRLockThe lock (some mixins auto-create one)

LockingIterableMixin

Wraps __iter__ so that each call to __next__ on the resulting iterator holds the lock. Returns a LockingIterator from __iter__.
proxy = LockingIterableMixin(obj=my_list, lock=RLock())
for item in proxy:
    ...

LockingAsyncIterableMixin

Wraps __aiter__ so that each __anext__ call on the resulting async iterator holds the lock. Returns a LockingAsyncIterator from __aiter__.
async for item in LockingAsyncIterableMixin(obj=async_gen, lock=lock):
    ...

LockingAccessMixin

Overrides __getattr__ to wrap every method found on the underlying object in a synchronized (or async-synchronized) wrapper. Handles Pydantic models specially (_copy_and_set_values). Inherits from LockingPedanticObjMixin.
proxy = LockingAccessMixin(obj=some_model, lock=RLock())
proxy.some_method()  # automatically acquires/releases the lock

LockingCallableMixin

Overrides __call__ to wrap the underlying callable in the lock. Detects coroutine functions automatically and applies async with self._lock in that case.
proxy = LockingCallableMixin(obj=my_func, lock=RLock())
result = proxy(arg1, arg2)

LockingGetItemMixin

Makes __getitem__ and __setitem__ on the underlying collection thread-safe.
proxy = LockingGetItemMixin(obj=my_dict, lock=RLock())
value = proxy["key"]
proxy["key"] = value

LockingSetItemMixin

Makes only __setitem__ thread-safe (read-write asymmetry for performance-oriented cases).
proxy = LockingSetItemMixin(obj=my_list, lock=RLock())
proxy[0] = new_value

SyncContextManagerMixin

Adds synchronous __enter__ / __exit__ that delegate to self._lock.acquire() / self._lock.release().
with proxy:
    ...

AsyncContextManagerMixin

Adds asynchronous __aenter__ / __aexit__ that delegate to self._lock.async_acquire() / self._lock.async_release().
async with proxy:
    ...

LockingDefaultLockMixin

Creates a new RLock() when no lock kwarg is provided, then stores it as self._lock before delegating upward.
proxy = LockingDefaultLockMixin(obj=my_obj)  # lock auto-created

LockingBaseLanguageModelMixin

Registers the proxy’s concrete type as a virtual subclass of langchain_core.language_models.BaseLanguageModel (when available), so isinstance checks pass transparently. Requires obj kwarg. No-op if langchain_core is not installed.

LockingDefaultAndBaseLanguageModelMixin

Combines LockingDefaultLockMixin and LockingBaseLanguageModelMixin: auto-creates a lock and registers as a BaseLanguageModel subtype.

LockingProxy

A ready-to-use thread-safe proxy that composes all locking mixins. MRO includes: LockingDefaultAndBaseLanguageModelMixin, LockingIterableMixin, LockingAsyncIterableMixin, LockingAccessMixin, LockingCallableMixin, LockingGetItemMixin, LockingSetItemMixin, SyncContextManagerMixin, AsyncContextManagerMixin. See the design article for details.
obj
Any
required
The object to wrap.
lock
RLock
default:"RLock()"
The reentrant lock to use. Auto-created if omitted.
Returns: A LockingProxy instance whose every operation is guarded by the lock.
from alexber.utils.thread_locals import LockingProxy

proxy = LockingProxy(obj=my_langchain_model)
# iterate, call, await, or use as context manager — all thread-safe
result = proxy(prompt)

Thread / event-loop utilities

is_running_in_main_thread

Returns True when called from the process’s main thread. Returns: bool
if is_running_in_main_thread():
    asyncio.run(main())

lift_to_async

Calls an async function from synchronous code, preserving the current contextvars.Context.
afunc
Callable[..., Awaitable[T]]
required
The async function to execute. Positional-only.
*args
Any
Positional arguments forwarded to afunc.
**kwargs
Any
Keyword arguments forwarded to afunc.
Returns: The return value of afunc (T). Raises:
  • RuntimeError — if called from the main thread while an event loop is already running (would block the loop).
result = lift_to_async(my_async_fn, arg1, key=val)
StopAsyncIteration raised inside afunc is automatically converted to RuntimeError to avoid hanging futures.

FutureWrapper

A thin proxy around asyncio.Future that tracks whether the result was explicitly consumed (awaited or retrieved via result() / exception()). Unconsumed exceptions are logged automatically.
fut
asyncio.Future
required
The future to wrap.

FutureWrapper.__await__

Awaits the underlying future, marking it as consumed.

FutureWrapper.result

Returns the future’s result, marking it as consumed. Raises: Whatever the underlying future raises.

FutureWrapper.exception

Returns the future’s exception (or None), marking it as consumed.

FutureWrapper.add_done_callback

Registers a done callback on the underlying future.
callback
Callable[[asyncio.Future], None]
required
The callback to invoke when the future completes.

FutureWrapper.done

Returns True if the underlying future is complete. Returns: bool

check_and_log_exception

Inspects a FutureWrapper without consuming it. If the future is done but unconsumed and holds an exception, the exception is logged at ERROR level.
wrapper
FutureWrapper
required
The wrapped future to inspect.
Returns: None
check_and_log_exception(wrapper)  # called internally by handle_future_exception

handle_future_exception

Schedules a delayed call to check_and_log_exception on the running event loop. Useful for fire-and-forget tasks where exceptions should still surface in logs.
future
asyncio.Future
required
The future to monitor.
delay
float
default:"0"
Seconds to wait before checking the future.
Returns: None
fut = asyncio.ensure_future(my_coro())
handle_future_exception(fut, delay=0.5)

ensure_thread_event_loop

Ensures the current (non-main) thread has an asyncio event loop attached. If none exists, a new one is created with asyncio.new_event_loop() and stored in thread-local storage. Returns: None
# Call once at the start of a worker thread
ensure_thread_event_loop()

exec_in_executor

Runs a sync or async function inside an executor while preserving the current ContextVar context.
executor
Optional[concurrent.futures.Executor]
required
The executor to use. If None, falls back to the executor set in initConfig(), then to the asyncio default.
func
Callable[..., T]
required
The sync or async function to run.
*args
Any
Positional arguments for func.
**kwargs
Any
Keyword arguments for func.
Returns: asyncio.Future[T] — wrapped in a FutureWrapper that logs unconsumed exceptions.
Must be called from within a running event loop (async context).
async def main():
    fut = exec_in_executor(None, cpu_bound_work, data)
    result = await fut

exec_in_executor_threading_future

Like exec_in_executor, but returns a concurrent.futures.Future instead of an asyncio.Future. Useful when the caller is on a non-async thread.
executor
Optional[concurrent.futures.Executor]
required
The executor to use. Resolution order same as exec_in_executor.
func
Callable[..., T]
required
The sync or async function to run.
*args
Any
Positional arguments for func.
**kwargs
Any
Keyword arguments for func.
Returns: concurrent.futures.Future[T]
future = exec_in_executor_threading_future(executor, process_item, item)
result = future.result()  # blocks the calling thread

chain_future_results

Propagates the outcome (result or exception) from source_future to target_future. Works with any combination of asyncio.Future and concurrent.futures.Future. Typically used as a done callback:
source.add_done_callback(lambda fut: chain_future_results(fut, target))
source_future
FutureType
required
The completed future to read from.
target_future
FutureType
required
The future to set the result/exception on.
Returns: None (no-op if target_future is already done).

get_main_event_loop

Returns the event loop stored by initConfig(). Returns None if initConfig() was never called. Returns: asyncio.AbstractEventLoop | None
loop = get_main_event_loop()

AsyncExecutionQueue

An async-context-manager task queue that serialises task submission and executes each task in a given executor, preserving ContextVar context.
queue
asyncio.Queue
default:"asyncio.Queue()"
Custom queue instance. A fresh asyncio.Queue is created when omitted.
executor
Optional[concurrent.futures.Executor]
default:"None"
Executor for task execution. Falls back to initConfig() executor, then asyncio default.
async with AsyncExecutionQueue(executor=my_pool) as q:
    fut = await q.aadd_task(my_func, arg1, key=val)
    result = await fut

AsyncExecutionQueue.__aenter__ / __aexit__

Starts the background worker task on enter; calls aclose() on exit.

AsyncExecutionQueue.worker

Coroutine. Continuously dequeues and executes tasks until a close sentinel is received. Each task is run via exec_in_executor inside the stored context.

AsyncExecutionQueue.aadd_task

Asynchronously enqueues a callable for execution.
func
Callable
required
The sync or async function to execute. Positional-only.
*args
Any
Positional arguments forwarded to func.
**kwargs
Any
Keyword arguments forwarded to func.
Returns: asyncio.Future — resolves to the return value of func.

AsyncExecutionQueue.add_task

Synchronous variant of aadd_task — submits the task from a non-async thread via exec_in_executor_threading_future.
executor
concurrent.futures.Executor
required
Executor used to run the async submission itself.
func
Callable[..., Any]
required
The sync or async function to execute. Positional-only.
*args
Any
Positional arguments forwarded to func.
**kwargs
Any
Keyword arguments forwarded to func.
Returns: concurrent.futures.Future

AsyncExecutionQueue.aclose

Cancels all pending tasks in the queue, then enqueues a close sentinel and awaits the worker coroutine. Returns: None (coroutine)

Context variable helpers

get_context_vars

Collects all top-level ContextVar instances from one or more modules and associates each with a factory callable. As a side effect, calls reset_context_vars on the collected entities.
*modules
ModuleType
required
One or more modules to scan.
factory_method_creator
Optional[Callable[[ContextVar, ModuleType], Callable]]
default:"None"
Custom resolver: given a ContextVar and its module, returns a factory callable. When None, looks up <var.name>_DEFAULT in the same module.
Returns: List[Dict] — each dict has keys 'var' (ContextVar) and 'factory' (Callable).
import my_module
from alexber.utils.thread_locals import get_context_vars

entities = get_context_vars(my_module)
# my_module must expose FOO_DEFAULT callable for each ContextVar named FOO

reset_context_vars

Resets each ContextVar in entities to the value returned by its associated factory.
*entities
Dict
required
Dicts produced by get_context_vars, each with 'var' and 'factory' keys.
Returns: None Raises:
  • ValueError — if a factory is not callable.
reset_context_vars(*entities)  # re-initialize all context vars to defaults

initConfig

Initialises global state required by lift_to_async(), exec_in_executor(), and get_main_event_loop(). Must be called from the main thread while an event loop is running.
executor
Optional[concurrent.futures.Executor]
default:"None"
Sets the global default executor. When None, the asyncio default is used.
Returns: None Raises:
  • RuntimeError — if no event loop is currently running.
async def startup():
    from concurrent.futures import ThreadPoolExecutor
    initConfig(executor=ThreadPoolExecutor(max_workers=4))