libp2p.rcmgr package

Submodules

libp2p.rcmgr.allowlist module

Allowlist implementation for the resource manager.

Only the specific peers and multiaddresses in the allowlist can bypass the resource limits.

class libp2p.rcmgr.allowlist.Allowlist(config: AllowlistConfig | None = None)

Bases: object

Allowlist for bypassing resource limits.

The allowlist can contain: - Specific peer IDs - Specific multiaddresses - Specific peer-multiaddr combinations

add_multiaddr(multiaddr: Multiaddr | str) None

Add a multiaddress to the allowlist.

add_peer(peer_id: ID) None

Add a peer to the allowlist.

add_peer_multiaddr(peer_id: ID, multiaddr: Multiaddr | str) None

Add a peer-multiaddr combination to the allowlist.

allowed(multiaddr: Multiaddr | str) bool

Check if a multiaddress is allowlisted (alias for allowed_multiaddr).

This method name matches the Go implementation.

allowed_multiaddr(multiaddr: Multiaddr | str) bool

Check if a multiaddress is allowlisted.

allowed_peer(peer_id: ID) bool

Check if a peer is allowlisted.

allowed_peer_and_multiaddr(peer_id: ID, multiaddr: Multiaddr | str) bool

Check if a peer-multiaddr combination is allowlisted.

clear() None

Clear all allowlist entries.

get_allowed_multiaddrs() set[str]

Get all allowed multiaddresses.

get_allowed_peers() set[ID]

Get all allowed peers.

is_empty() bool

Check if the allowlist is empty.

remove_multiaddr(multiaddr: Multiaddr | str) None

Remove a multiaddress from the allowlist.

remove_peer(peer_id: ID) None

Remove a peer from the allowlist.

remove_peer_multiaddr(peer_id: ID, multiaddr: Multiaddr | str) None

Remove a peer-multiaddr combination from the allowlist.

class libp2p.rcmgr.allowlist.AllowlistConfig(peers: set[~libp2p.peer.id.ID] = <factory>, multiaddrs: set[str] = <factory>, peer_multiaddrs: set[tuple[~libp2p.peer.id.ID, str]] = <factory>)

Bases: object

Configuration for the allowlist.

multiaddrs: set[str]
peer_multiaddrs: set[tuple[ID, str]]
peers: set[ID]
libp2p.rcmgr.allowlist.new_allowlist() Allowlist

Create a new empty allowlist.

libp2p.rcmgr.allowlist.new_allowlist_with_config(config: AllowlistConfig) Allowlist

Create a new allowlist with the given configuration.

libp2p.rcmgr.exceptions module

Resource manager exception classes.

exception libp2p.rcmgr.exceptions.InvalidResourceManagerState(message: str)

Bases: ResourceManagerException

Exception raised when the resource manager is in an invalid state.

exception libp2p.rcmgr.exceptions.MemoryLimitExceeded(current: int, attempted: int, limit: int, priority: int)

Bases: ResourceLimitExceeded

Exception raised when memory limit is exceeded.

exception libp2p.rcmgr.exceptions.ResourceLimitExceeded(scope_name: str | None = None, resource_type: str | None = None, requested: int | None = None, available: int | None = None, message: str | None = None)

Bases: ResourceManagerException

Exception raised when a resource limit is exceeded.

exception libp2p.rcmgr.exceptions.ResourceManagerException(message: str)

Bases: Exception

Base exception for all resource manager errors.

exception libp2p.rcmgr.exceptions.ResourceScopeClosed(scope_name: str | None = None, message: str | None = None)

Bases: ScopeClosedException

Legacy alias for ScopeClosedException.

exception libp2p.rcmgr.exceptions.ScopeClosedException(scope_name: str | None = None, message: str | None = None)

Bases: ResourceManagerException

Exception raised when trying to use a closed resource scope.

exception libp2p.rcmgr.exceptions.StreamOrConnLimitExceeded(current: int, attempted: int, limit: int, resource_type: str)

Bases: ResourceLimitExceeded

Exception for stream or connection limit exceeded errors.

libp2p.rcmgr.limits module

Resource limits and enums.

This module provides common enums and types used across the resource management system.

libp2p.rcmgr.manager module

class libp2p.rcmgr.manager.ConnectionScope(peer_id: str, resource_manager: libp2p.rcmgr.ResourceManager)

Bases: object

Represents a resource scope for a connection. Used for tracking and cleanup.

close() None
class libp2p.rcmgr.manager.ResourceLimits(max_connections: int = 1000, max_memory_mb: int = 512, max_streams: int = 10000)

Bases: object

Resource limits configuration

class libp2p.rcmgr.manager.ResourceManager(limits: ResourceLimits | None = None, allowlist: Allowlist | None = None, metrics: Metrics | None = None, allowlist_config: AllowlistConfig | None = None, enable_metrics: bool = True, connection_limits: ConnectionLimits | None = None, enable_connection_tracking: bool = True, memory_limits: MemoryConnectionLimits | None = None, enable_memory_limits: bool = True, enable_connection_pooling: bool = False, enable_memory_pooling: bool = False, enable_circuit_breaker: bool = True, enable_graceful_degradation: bool = True, enable_prometheus: bool = False, prometheus_port: int = 8000, prometheus_exporter: PrometheusExporter | None = None, enable_rate_limiting: bool = True, connections_per_peer_per_sec: float = 10.0, burst_connections_per_peer: float = 50.0, cidr_limits: list[tuple[str, int]] | None = None)

Bases: object

Resource manager for tracking and limiting libp2p resources.

Manages connections, memory usage, and streams with configurable limits.

acquire_connection(peer_id: str = '', endpoint_ip: str | None = None) bool

Acquire a connection resource

acquire_memory(size: int) bool

Acquire memory resource

acquire_scoped_stream(peer_id: str, direction: Direction, service: str | None = None, protocol: str | None = None) bool

Acquire a stream accounting for service/protocol (and per-peer) limits.

acquire_stream(peer_id: str, direction: Direction) bool

Acquire a stream resource

close() None

Close the resource manager

get_stats() dict[str, Any]

Get current resource statistics

is_resource_available(resource_type: str, amount: int = 1) bool

Check if a resource is available

open_connection(peer_id: Any | None = None, endpoint_ip: str | None = None) ConnectionScope | None

Open a connection resource for the given peer and return a scope object for tracking/cleanup.

release_connection(peer_id: str = '', endpoint_ip: str | None = None) None

Release a connection resource

release_memory(size: int) None

Release memory resource

release_scoped_stream(peer_id: str, direction: Direction, service: str | None = None, protocol: str | None = None) None
release_stream(peer_id: str, direction: Direction) None

Release a stream resource

set_protocol_peer_stream_limit(protocol: str, max_streams: int) None
set_protocol_stream_limit(protocol: str, max_streams: int) None
set_service_peer_stream_limit(service: str, max_streams: int) None
set_service_stream_limit(service: str, max_streams: int) None
libp2p.rcmgr.manager.new_resource_manager(limits: ResourceLimits | None = None, allowlist_config: AllowlistConfig | None = None, enable_metrics: bool = True, connection_limits: ConnectionLimits | None = None, enable_connection_tracking: bool = True, memory_limits: MemoryConnectionLimits | None = None, enable_memory_limits: bool = True, enable_connection_pooling: bool = True, enable_memory_pooling: bool = True, enable_circuit_breaker: bool = True, enable_graceful_degradation: bool = True) libp2p.rcmgr.ResourceManager

Create a new resource manager with reasonable defaults.

libp2p.rcmgr.metrics module

Optimized resource metrics implementation.

This module provides high-performance metrics collection for the resource manager, using array-based storage for optimal performance in production environments.

class libp2p.rcmgr.metrics.Direction(value)

Bases: IntEnum

Direction enum for resource tracking

INBOUND = 0
OUTBOUND = 1
class libp2p.rcmgr.metrics.MetricType(value)

Bases: IntEnum

Enumeration of metric types for array indexing.

CONNECTIONS_INBOUND = 0
CONNECTIONS_OUTBOUND = 1
CONNECTION_BLOCKS = 5
MEMORY_BLOCKS = 6
MEMORY_USAGE = 2
PEAK_CONNECTIONS = 8
PEAK_MEMORY = 9
PEAK_STREAMS = 10
STREAMS_INBOUND = 3
STREAMS_OUTBOUND = 4
STREAM_BLOCKS = 7
class libp2p.rcmgr.metrics.Metrics

Bases: object

High-performance metrics with array storage.

This implementation uses pre-allocated arrays for O(1) access to metrics, avoiding dictionary overhead and string operations.

allow_conn(direction: str, use_fd: bool = True) None

Record an allowed connection (backward compatibility).

allow_memory(size: int) None

Record allowed memory (backward compatibility).

allow_stream(peer_id: str, direction: str) None

Record an allowed stream (backward compatibility).

block_conn(direction: str, use_fd: bool = True) None

Record a blocked connection (backward compatibility).

block_memory(size: int) None

Record blocked memory (backward compatibility).

block_stream(peer_id: str, direction: str) None

Record a blocked stream (backward compatibility).

get_counter(metric: MetricType) int

Get a counter metric value.

Args:

metric: The metric type to get

Returns:

The counter value

get_gauge(metric: MetricType) float

Get a gauge metric value.

Args:

metric: The metric type to get

Returns:

The gauge value

get_summary() dict[str, Any]

Get a summary of all metrics.

Returns:

Dictionary containing all metric values

increment(metric: MetricType, delta: int = 1) None

Increment a counter metric.

Args:

metric: The metric type to increment delta: Amount to increment by

record_block(resource_type: str) None

Record a resource block event.

Args:

resource_type: Type of resource that was blocked

record_connection(direction: str, delta: int = 1) None

Record a connection event.

Args:

direction: Connection direction (‘inbound’ or ‘outbound’) delta: Change in connection count

record_memory(size: int, delta: int = 1) None

Record a memory event.

Args:

size: Memory size in bytes delta: Change in memory count

record_stream(direction: str, delta: int = 1) None

Record a stream event.

Args:

direction: Stream direction (‘inbound’ or ‘outbound’) delta: Change in stream count

release_memory(size: int) None

Record released memory (backward compatibility).

remove_conn(direction: str, use_fd: bool = True) None

Record a removed connection (backward compatibility).

remove_stream(direction: str) None

Record a removed stream (backward compatibility).

reset() None

Reset all metrics to zero.

set_gauge(metric: MetricType, value: float) None

Set a gauge metric value.

Args:

metric: The metric type to set value: The value to set

class libp2p.rcmgr.metrics.ResourceMetrics(allowed: int = 0, blocked: int = 0, current_usage: int = 0, peak_usage: int = 0, last_updated: float = <factory>)

Bases: object

Metrics for a specific resource type.

allowed: int = 0
blocked: int = 0
current_usage: int = 0
last_updated: float
peak_usage: int = 0

Module contents

Resource manager package exports.

Expose the small set of public symbols the rest of the codebase and tests import from libp2p.rcmgr so callers can do e.g.:

from libp2p.rcmgr import Direction, ResourceManager, new_resource_manager

This keeps the package import surface explicit and stable.

class libp2p.rcmgr.Direction(value)

Bases: IntEnum

Direction enum for resource tracking

INBOUND = 0
OUTBOUND = 1
class libp2p.rcmgr.ResourceLimits(max_connections: int = 1000, max_memory_mb: int = 512, max_streams: int = 10000)

Bases: object

Resource limits configuration

class libp2p.rcmgr.ResourceManager(limits: ResourceLimits | None = None, allowlist: Allowlist | None = None, metrics: Metrics | None = None, allowlist_config: AllowlistConfig | None = None, enable_metrics: bool = True, connection_limits: ConnectionLimits | None = None, enable_connection_tracking: bool = True, memory_limits: MemoryConnectionLimits | None = None, enable_memory_limits: bool = True, enable_connection_pooling: bool = False, enable_memory_pooling: bool = False, enable_circuit_breaker: bool = True, enable_graceful_degradation: bool = True, enable_prometheus: bool = False, prometheus_port: int = 8000, prometheus_exporter: PrometheusExporter | None = None, enable_rate_limiting: bool = True, connections_per_peer_per_sec: float = 10.0, burst_connections_per_peer: float = 50.0, cidr_limits: list[tuple[str, int]] | None = None)

Bases: object

Resource manager for tracking and limiting libp2p resources.

Manages connections, memory usage, and streams with configurable limits.

acquire_connection(peer_id: str = '', endpoint_ip: str | None = None) bool

Acquire a connection resource

acquire_memory(size: int) bool

Acquire memory resource

acquire_scoped_stream(peer_id: str, direction: Direction, service: str | None = None, protocol: str | None = None) bool

Acquire a stream accounting for service/protocol (and per-peer) limits.

acquire_stream(peer_id: str, direction: Direction) bool

Acquire a stream resource

close() None

Close the resource manager

get_stats() dict[str, Any]

Get current resource statistics

is_resource_available(resource_type: str, amount: int = 1) bool

Check if a resource is available

open_connection(peer_id: Any | None = None, endpoint_ip: str | None = None) ConnectionScope | None

Open a connection resource for the given peer and return a scope object for tracking/cleanup.

release_connection(peer_id: str = '', endpoint_ip: str | None = None) None

Release a connection resource

release_memory(size: int) None

Release memory resource

release_scoped_stream(peer_id: str, direction: Direction, service: str | None = None, protocol: str | None = None) None
release_stream(peer_id: str, direction: Direction) None

Release a stream resource

set_protocol_peer_stream_limit(protocol: str, max_streams: int) None
set_protocol_stream_limit(protocol: str, max_streams: int) None
set_service_peer_stream_limit(service: str, max_streams: int) None
set_service_stream_limit(service: str, max_streams: int) None
exception libp2p.rcmgr.ResourceScopeClosed(scope_name: str | None = None, message: str | None = None)

Bases: ScopeClosedException

Legacy alias for ScopeClosedException.

libp2p.rcmgr.new_resource_manager(limits: ResourceLimits | None = None, allowlist_config: AllowlistConfig | None = None, enable_metrics: bool = True, connection_limits: ConnectionLimits | None = None, enable_connection_tracking: bool = True, memory_limits: MemoryConnectionLimits | None = None, enable_memory_limits: bool = True, enable_connection_pooling: bool = True, enable_memory_pooling: bool = True, enable_circuit_breaker: bool = True, enable_graceful_degradation: bool = True) libp2p.rcmgr.ResourceManager

Create a new resource manager with reasonable defaults.