libp2p.tools.anyio_service package

Submodules

libp2p.tools.anyio_service.api module

Public Service API layer with decorators and helper functions.

libp2p.tools.anyio_service.api.AsyncFn

alias of Callable[[…], Awaitable[Any]]

class libp2p.tools.anyio_service.api.InternalManagerAPI

Bases: ManagerAPI

Internal interface for service managers with task scheduling capabilities.

This extends ManagerAPI with methods that should only be used internally by the service implementation, not by external callers.

abstract run_child_service(service: ServiceAPI, daemon: bool = False, name: str | None = None) ManagerAPI

Run a child service in the background.

abstract run_daemon_child_service(service: ServiceAPI, name: str | None = None) ManagerAPI

Run a daemon child service.

abstract run_daemon_task(async_fn: Callable[[...], Awaitable[Any]], *args: Any, name: str | None = None) None

Run a daemon task (expected to run indefinitely).

abstract run_task(async_fn: Callable[[...], Awaitable[Any]], *args: Any, daemon: bool = False, name: str | None = None) None

Run a task in the background.

class libp2p.tools.anyio_service.api.ManagerAPI

Bases: ABC

External interface for service managers.

abstract cancel() None

Cancel the service (non-blocking).

abstract property did_error: bool

Return True if the service encountered an error.

abstract property is_cancelled: bool

Return True if the service has been cancelled.

abstract property is_finished: bool

Return True if the service has finished.

abstract property is_running: bool

Return True if the service is actively running.

abstract property is_started: bool

Return True if the service has been started.

abstract property stats: Stats

Get service statistics.

abstract async stop() None

Stop the service and wait for completion.

abstract async wait_finished() None

Wait for the service to finish.

abstract async wait_started() None

Wait for the service to start.

class libp2p.tools.anyio_service.api.Service

Bases: ServiceAPI

AnyIO-based service implementation.

get_manager() ManagerAPI

External access to the manager.

property manager: InternalManagerAPI

Internal access to the manager (for subclasses).

class libp2p.tools.anyio_service.api.ServiceAPI

Bases: ABC

Abstract base class for services.

abstract get_manager() ManagerAPI

Get the manager for this service.

abstract async run() None

Main service logic - implemented by subclasses.

libp2p.tools.anyio_service.api.as_service(service_fn: Callable[[...], Awaitable[Any]]) type[ServiceAPI]

Convert a simple async function into a Service class.

libp2p.tools.anyio_service.api.external_api(func: TFunc) TFunc

Decorator to protect external API methods.

MEDIUM COMPLEXITY: Ensures methods can only be called while service is running. Uses AnyIO streams and task groups to race the API call against service shutdown.

libp2p.tools.anyio_service.context module

Context managers and aliases for running services.

class libp2p.tools.anyio_service.context.AnyIOManager(service: ServiceAPI, max_children_per_task: int = 1000, logger: logging.Logger | None = None)

Bases: InternalManagerAPI

AnyIO-based service manager with full lifecycle management.

Implements proper lifecycle with locks, stats tracking, dual nursery architecture, task hierarchy, and ordered cancellation.

cancel() None

Cancel the service (non-blocking).

HIGH COMPLEXITY: Enhanced validation and state management.

property did_error: bool

Return True if the service encountered an error.

property is_cancelled: bool

Return True if the service has been cancelled.

property is_finished: bool

Return True if the service has finished.

property is_running: bool

Return True if the service is actively running.

property is_started: bool

Return True if the service has been started.

async run() None

Main run loop with proper lifecycle management.

HIGH COMPLEXITY: - Lock-based lifecycle protection - Dual nursery architecture (system + task) - Cancellation handler - Error collection and propagation - Proper cleanup on exit

run_child_service(service: ServiceAPI, daemon: bool = False, name: str | None = None) ManagerAPI

Run a child service.

HIGH COMPLEXITY: - Creates ChildServiceTask with full lifecycle - Finds parent using anyio.get_current_task() - Adds to task hierarchy - Returns child manager for external control

run_daemon_child_service(service: ServiceAPI, name: str | None = None) ManagerAPI

Run a daemon child service.

run_daemon_task(async_fn: AsyncFn, *args: Any, name: str | None = None) None

Run a daemon task (expected to run indefinitely).

async classmethod run_service(service: ServiceAPI) None

Class method to run a service.

run_task(async_fn: AsyncFn, *args: Any, daemon: bool = False, name: str | None = None, _internal: bool = False) None

Run a task in the background.

HIGH COMPLEXITY: - Creates FunctionTask with full lifecycle - Finds parent using anyio.get_current_task() - Adds to task hierarchy - Schedules for execution

property stats: Stats

Return current statistics.

async stop() None

Stop the service and wait for completion.

async wait_finished() None

Wait for the service to finish.

async wait_started() None

Wait for the service to start.

class libp2p.tools.anyio_service.context.TrioManager(service: ServiceAPI, max_children_per_task: int = 1000, logger: logging.Logger | None = None)

Bases: InternalManagerAPI

Trio-based service manager with full lifecycle management.

Implements proper lifecycle with locks, stats tracking, dual nursery architecture, task hierarchy, and ordered cancellation.

cancel() None

Cancel the service (non-blocking).

HIGH COMPLEXITY: Enhanced validation and state management.

property did_error: bool

Return True if the service encountered an error.

property is_cancelled: bool

Return True if the service has been cancelled.

property is_finished: bool

Return True if the service has finished.

property is_running: bool

Return True if the service is actively running.

property is_started: bool

Return True if the service has been started.

async run() None

Main run loop with proper lifecycle management.

HIGH COMPLEXITY: - Lock-based lifecycle protection - Dual nursery architecture (system + task) - Cancellation handler - Error collection and propagation - Proper cleanup on exit

run_child_service(service: ServiceAPI, daemon: bool = False, name: str | None = None) ManagerAPI

Run a child service.

HIGH COMPLEXITY: - Creates ChildServiceTask with full lifecycle - Finds parent using trio.lowlevel.current_task() - Adds to task hierarchy - Returns child manager for external control

run_daemon_child_service(service: ServiceAPI, name: str | None = None) ManagerAPI

Run a daemon child service.

run_daemon_task(async_fn: AsyncFn, *args: Any, name: str | None = None) None

Run a daemon task (expected to run indefinitely).

async classmethod run_service(service: ServiceAPI) None

Class method to run a service.

run_task(async_fn: AsyncFn, *args: Any, daemon: bool = False, name: str | None = None, _internal: bool = False) None

Run a task in the background.

HIGH COMPLEXITY: - Creates FunctionTask with full lifecycle - Finds parent using trio.lowlevel.current_task() - Adds to task hierarchy - Schedules for execution

property stats: Stats

Return current statistics.

async stop() None

Stop the service and wait for completion.

async wait_finished() None

Wait for the service to finish.

async wait_started() None

Wait for the service to start.

libp2p.tools.anyio_service.context.background_anyio_service(service: ServiceAPI) AsyncIterator[ManagerAPI]

Run a service in the background using AnyIO’s structured concurrency.

The service is running within the context block and will be properly cleaned up upon exiting the context block.

This uses AnyIO and can work with either Trio or asyncio backends.

libp2p.tools.anyio_service.context.background_trio_service(service: ServiceAPI) AsyncIterator[ManagerAPI]

Run a service in the background using Trio’s structured concurrency.

The service is running within the context block and will be properly cleaned up upon exiting the context block.

Note: This uses TrioManager with pure Trio primitives for compatibility with existing Trio-based services (Swarm, PubSub, KadDHT, etc.).

libp2p.tools.anyio_service.exceptions module

Exceptions used by the AnyIO‐based service framework.

exception libp2p.tools.anyio_service.exceptions.DaemonTaskExit

Bases: ServiceException

Exception raised when a daemon task exits unexpectedly

exception libp2p.tools.anyio_service.exceptions.LifecycleError

Bases: ServiceException

Error raised when service operations are performed outside of service lifecycle.

exception libp2p.tools.anyio_service.exceptions.ServiceException

Bases: Exception

Base class for Service exceptions

exception libp2p.tools.anyio_service.exceptions.TooManyChildrenException

Bases: ServiceException

Raised when too many child tasks are spawned

libp2p.tools.anyio_service.manager module

Service manager implementation with lifecycle management.

class libp2p.tools.anyio_service.manager.AnyIOManager(service: ServiceAPI, max_children_per_task: int = 1000, logger: logging.Logger | None = None)

Bases: InternalManagerAPI

AnyIO-based service manager with full lifecycle management.

Implements proper lifecycle with locks, stats tracking, dual nursery architecture, task hierarchy, and ordered cancellation.

cancel() None

Cancel the service (non-blocking).

HIGH COMPLEXITY: Enhanced validation and state management.

property did_error: bool

Return True if the service encountered an error.

property is_cancelled: bool

Return True if the service has been cancelled.

property is_finished: bool

Return True if the service has finished.

property is_running: bool

Return True if the service is actively running.

property is_started: bool

Return True if the service has been started.

async run() None

Main run loop with proper lifecycle management.

HIGH COMPLEXITY: - Lock-based lifecycle protection - Dual nursery architecture (system + task) - Cancellation handler - Error collection and propagation - Proper cleanup on exit

run_child_service(service: ServiceAPI, daemon: bool = False, name: str | None = None) ManagerAPI

Run a child service.

HIGH COMPLEXITY: - Creates ChildServiceTask with full lifecycle - Finds parent using anyio.get_current_task() - Adds to task hierarchy - Returns child manager for external control

run_daemon_child_service(service: ServiceAPI, name: str | None = None) ManagerAPI

Run a daemon child service.

run_daemon_task(async_fn: AsyncFn, *args: Any, name: str | None = None) None

Run a daemon task (expected to run indefinitely).

async classmethod run_service(service: ServiceAPI) None

Class method to run a service.

run_task(async_fn: AsyncFn, *args: Any, daemon: bool = False, name: str | None = None, _internal: bool = False) None

Run a task in the background.

HIGH COMPLEXITY: - Creates FunctionTask with full lifecycle - Finds parent using anyio.get_current_task() - Adds to task hierarchy - Schedules for execution

property stats: Stats

Return current statistics.

async stop() None

Stop the service and wait for completion.

async wait_finished() None

Wait for the service to finish.

async wait_started() None

Wait for the service to start.

libp2p.tools.anyio_service.stats module

Lightweight statistics dataclasses used by the service framework.

class libp2p.tools.anyio_service.stats.Stats(tasks: TaskStats)

Bases: NamedTuple

Overall service statistics.

tasks: TaskStats

Alias for field number 0

class libp2p.tools.anyio_service.stats.TaskStats(total_count: int, finished_count: int)

Bases: NamedTuple

Statistics for task execution.

finished_count: int

Alias for field number 1

property pending_count: int

Return the number of tasks that are still running.

total_count: int

Alias for field number 0

libp2p.tools.anyio_service.tasks module

Task abstractions for the AnyIO service framework.

class libp2p.tools.anyio_service.tasks.BaseTask(name: str, daemon: bool, parent: TaskWithChildrenAPI | None)

Bases: TaskAPI

Base task implementation with common functionality.

class libp2p.tools.anyio_service.tasks.BaseTaskWithChildren(name: str, daemon: bool, parent: TaskWithChildrenAPI | None, max_children: int = 1000)

Bases: BaseTask, TaskWithChildrenAPI

Base task that can manage child tasks.

add_child(child: TaskAPI) None

Add a child task.

discard_child(child: TaskAPI) None

Remove a child task.

class libp2p.tools.anyio_service.tasks.ChildServiceTask(name: str, daemon: bool, parent: TaskWithChildrenAPI | None, child_service: ServiceAPI)

Bases: BaseTask

Task that wraps a child service.

HIGH COMPLEXITY: - Creates a manager for the child service - Integrates with task hierarchy - Proper lifecycle management

async cancel() None

Cancel the child service.

property is_done: bool

Check if the task is completed.

async run() None

Run the child service.

async wait_done() None

Wait for the task to complete.

class libp2p.tools.anyio_service.tasks.FunctionTask(name: str, daemon: bool, parent: TaskWithChildrenAPI | None, async_fn: Any, async_fn_args: Sequence[Any], count_in_stats: bool = True, max_children: int = 1000)

Bases: BaseTaskWithChildren

Task that wraps an async function with full lifecycle management.

HIGH COMPLEXITY: - Tracks the anyio task for parent finding - Has its own cancel scope for ordered cancellation - Waits for children before completing (if not daemon) - Manages lifecycle with events

property anyio_task: TaskInfo
async cancel() None

Cancel this task and all children.

HIGH COMPLEXITY: Ordered cancellation (children first)

property count_in_stats: bool

Return whether this task should be included in user-facing statistics.

property has_anyio_task: bool
property is_done: bool

Check if the task is completed.

classmethod iterate_tasks(*tasks: TaskAPI) Iterable[FunctionTask]

Iterate over all FunctionTask instances and their children recursively.

async run() None

Run the function with full lifecycle management.

HIGH COMPLEXITY: - Sets anyio task for parent finding - Runs within cancel scope - Enforces daemon behavior - Waits for children before completing (if not daemon) - Cleans up parent reference

async wait_done() None

Wait for the task to complete.

class libp2p.tools.anyio_service.tasks.TaskAPI

Bases: Hashable

Abstract base class for tasks in the service framework.

abstract async cancel() None

Cancel the task.

daemon: bool
abstract property is_done: bool

Check if the task is completed.

name: str
parent: TaskWithChildrenAPI | None
abstract async run() None

Execute the task logic.

abstract async wait_done() None

Wait for the task to complete.

class libp2p.tools.anyio_service.tasks.TaskType(value)

Bases: str, Enum

Task type identifiers for internal framework tasks.

Using an enum ensures no collision with user task names and provides a single source of truth for internal task naming.

INTERNAL_SERVICE_RUN = '_internal:service.run'
USER = 'user'
class libp2p.tools.anyio_service.tasks.TaskWithChildrenAPI

Bases: TaskAPI

Task that can manage child tasks.

abstract add_child(child: TaskAPI) None

Add a child task.

children: set[TaskAPI]
abstract discard_child(child: TaskAPI) None

Remove a child task.

libp2p.tools.anyio_service.utils module

Utility helpers for the AnyIO service implementation.

libp2p.tools.anyio_service.utils.get_task_name(value: Any, explicit_name: str | None = None) str

Return a human-readable task name.

Mirrors the logic from the original implementation so that log messages stay identical after the refactor.

libp2p.tools.anyio_service.utils.is_verbose_logging_enabled() bool

Check if verbose logging is enabled via environment variable.

Module contents

AnyIO-based service framework (modularized).

This package provides a production-ready service implementation using AnyIO for structured concurrency and cross-platform async compatibility (asyncio + trio).

The implementation is split across multiple modules for clarity: - exceptions: Service-specific exception types - stats: Lightweight statistics dataclasses - utils: Helper functions - tasks: Task abstractions and implementations - manager: AnyIOManager with full lifecycle management - api: Service APIs, decorators, and base classes - context: Context managers for running services

class libp2p.tools.anyio_service.AnyIOManager(service: ServiceAPI, max_children_per_task: int = 1000, logger: logging.Logger | None = None)

Bases: InternalManagerAPI

AnyIO-based service manager with full lifecycle management.

Implements proper lifecycle with locks, stats tracking, dual nursery architecture, task hierarchy, and ordered cancellation.

cancel() None

Cancel the service (non-blocking).

HIGH COMPLEXITY: Enhanced validation and state management.

property did_error: bool

Return True if the service encountered an error.

property is_cancelled: bool

Return True if the service has been cancelled.

property is_finished: bool

Return True if the service has finished.

property is_running: bool

Return True if the service is actively running.

property is_started: bool

Return True if the service has been started.

async run() None

Main run loop with proper lifecycle management.

HIGH COMPLEXITY: - Lock-based lifecycle protection - Dual nursery architecture (system + task) - Cancellation handler - Error collection and propagation - Proper cleanup on exit

run_child_service(service: ServiceAPI, daemon: bool = False, name: str | None = None) ManagerAPI

Run a child service.

HIGH COMPLEXITY: - Creates ChildServiceTask with full lifecycle - Finds parent using anyio.get_current_task() - Adds to task hierarchy - Returns child manager for external control

run_daemon_child_service(service: ServiceAPI, name: str | None = None) ManagerAPI

Run a daemon child service.

run_daemon_task(async_fn: AsyncFn, *args: Any, name: str | None = None) None

Run a daemon task (expected to run indefinitely).

async classmethod run_service(service: ServiceAPI) None

Class method to run a service.

run_task(async_fn: AsyncFn, *args: Any, daemon: bool = False, name: str | None = None, _internal: bool = False) None

Run a task in the background.

HIGH COMPLEXITY: - Creates FunctionTask with full lifecycle - Finds parent using anyio.get_current_task() - Adds to task hierarchy - Schedules for execution

property stats: Stats

Return current statistics.

async stop() None

Stop the service and wait for completion.

async wait_finished() None

Wait for the service to finish.

async wait_started() None

Wait for the service to start.

exception libp2p.tools.anyio_service.DaemonTaskExit

Bases: ServiceException

Exception raised when a daemon task exits unexpectedly

exception libp2p.tools.anyio_service.LifecycleError

Bases: ServiceException

Error raised when service operations are performed outside of service lifecycle.

class libp2p.tools.anyio_service.Service

Bases: ServiceAPI

AnyIO-based service implementation.

get_manager() ManagerAPI

External access to the manager.

property manager: InternalManagerAPI

Internal access to the manager (for subclasses).

exception libp2p.tools.anyio_service.ServiceException

Bases: Exception

Base class for Service exceptions

class libp2p.tools.anyio_service.Stats(tasks: TaskStats)

Bases: NamedTuple

Overall service statistics.

tasks: TaskStats

Alias for field number 0

class libp2p.tools.anyio_service.TaskStats(total_count: int, finished_count: int)

Bases: NamedTuple

Statistics for task execution.

finished_count: int

Alias for field number 1

property pending_count: int

Return the number of tasks that are still running.

total_count: int

Alias for field number 0

class libp2p.tools.anyio_service.TaskType(value)

Bases: str, Enum

Task type identifiers for internal framework tasks.

Using an enum ensures no collision with user task names and provides a single source of truth for internal task naming.

INTERNAL_SERVICE_RUN = '_internal:service.run'
USER = 'user'
exception libp2p.tools.anyio_service.TooManyChildrenException

Bases: ServiceException

Raised when too many child tasks are spawned

libp2p.tools.anyio_service.as_service(service_fn: Callable[[...], Awaitable[Any]]) type[ServiceAPI]

Convert a simple async function into a Service class.

libp2p.tools.anyio_service.background_anyio_service(service: ServiceAPI) AsyncIterator[ManagerAPI]

Run a service in the background using AnyIO’s structured concurrency.

The service is running within the context block and will be properly cleaned up upon exiting the context block.

This uses AnyIO and can work with either Trio or asyncio backends.

libp2p.tools.anyio_service.background_trio_service(service: ServiceAPI) AsyncIterator[ManagerAPI]

Run a service in the background using Trio’s structured concurrency.

The service is running within the context block and will be properly cleaned up upon exiting the context block.

Note: This uses TrioManager with pure Trio primitives for compatibility with existing Trio-based services (Swarm, PubSub, KadDHT, etc.).

libp2p.tools.anyio_service.external_api(func: TFunc) TFunc

Decorator to protect external API methods.

MEDIUM COMPLEXITY: Ensures methods can only be called while service is running. Uses AnyIO streams and task groups to race the API call against service shutdown.

libp2p.tools.anyio_service.get_task_name(value: Any, explicit_name: str | None = None) str

Return a human-readable task name.

Mirrors the logic from the original implementation so that log messages stay identical after the refactor.

libp2p.tools.anyio_service.is_verbose_logging_enabled() bool

Check if verbose logging is enabled via environment variable.