libp2p.network package

Subpackages

Submodules

libp2p.network.exceptions module

exception libp2p.network.exceptions.RateLimitError(message: str, consumed_points: int, remaining_points: int = 0)

Bases: Exception

Raised when rate limit is exceeded.

exception libp2p.network.exceptions.SwarmDialAllFailedError(message: str, peer_id: object | None = None, num_addrs_tried: int = 0)

Bases: SwarmException

Raised when all addresses for a peer have been tried and none succeeded.

exception libp2p.network.exceptions.SwarmException

Bases: BaseLibp2pError

libp2p.network.swarm module

class libp2p.network.swarm.Swarm(peer_id: ID, peerstore: IPeerStore, upgrader: TransportUpgrader, transport: ITransport, retry_config: RetryConfig | None = None, connection_config: ConnectionConfig | QUICTransportConfig | None = None, psk: str | None = None)

Bases: Service, INetworkService

async add_conn(muxed_conn: IMuxedConn, direction: str = 'unknown') SwarmConn

Add a IMuxedConn to Swarm as a SwarmConn, notify “connected”, and start to monitor the connection for its new streams and disconnection.

Parameters

muxed_connIMuxedConn

The muxed connection to add

directionstr

Connection direction: “inbound” or “outbound”. Default: “unknown”

auto_connector: AutoConnector
background_nursery: Nursery | None
async close() None

Close the swarm instance and cleanup resources.

async close_peer(peer_id: ID) None

Close all connections to the specified peer.

Parameters

peer_idID

The peer ID to close connections for.

common_stream_handler: Callable[[object], Awaitable[None]]
connection_config: ConnectionConfig | QUICTransportConfig
connection_gate: ConnectionGate
connection_pruner: ConnectionPruner
connections: dict[ID, list[INetConn]]
property connections_legacy: dict[ID, INetConn]

Legacy 1:1 mapping for backward compatibility.

Returns

dict[ID, INetConn]

Legacy mapping with only the first connection per peer.

async dial_addr(addr: Multiaddr, peer_id: ID) INetConn

Enhanced: Try to create a connection to peer_id with addr using retry logic.

Parameters:
  • addr – the address we want to connect with

  • peer_id – the peer we want to connect to

Raises:

SwarmException – raised when an error occurs

Returns:

network connection

async dial_peer(peer_id: ID) list[INetConn]

Try to create connections to peer_id (go-libp2p style).

This method directly dials the peer using known addresses from peerstore. Connection gating is applied to filter addresses.

Parameters:

peer_id – peer if we want to dial

Raises:

SwarmException – raised when an error occurs

Returns:

list of muxed connections

dns_resolver: DNSResolver
event_background_nursery_created: Event
get_connection(peer_id: ID) INetConn | None

Get single connection for backward compatibility.

Parameters

peer_idID

The peer ID to get a connection for.

Returns

INetConn | None

The first available connection, or None if no connections exist.

get_connections(peer_id: ID | None = None) list[INetConn]

Get connections for peer (like JS getConnections, Go ConnsToPeer).

Parameters

peer_idID | None

The peer ID to get connections for. If None, returns all connections.

Returns

list[INetConn]

List of connections to the specified peer, or all connections if peer_id is None.

get_connections_map() dict[ID, list[INetConn]]

Get all connections map (like JS getConnectionsMap).

Returns

dict[ID, list[INetConn]]

The complete mapping of peer IDs to their connection lists.

get_metrics() dict[str, int]

Get connection metrics (go-libp2p style).

Returns a simple dict with connection counts. For detailed metrics, use ResourceManager.

Returns

dict[str, int]

Connection metrics including total, inbound, and outbound counts.

get_peer_id() ID

Retrieve the peer identifier for this network.

Returns

ID

The identifier of this peer.

get_tag_info(peer_id: ID) TagInfo | None

Get the metadata associated with a peer.

Parameters

peer_idID

The peer to get info for.

Returns

TagInfo | None

The tag info for the peer, or None if no tags recorded.

get_total_connections() int

Get total number of connections (inbound + outbound).

Returns

int

Total number of active connections

is_protected(peer_id: ID, tag: str = '') bool

Check if a peer is protected.

Parameters

peer_idID

The peer to check.

tagstr

If provided, check if protected by this specific tag. If empty string, check if protected by any tag.

Returns

bool

True if the peer is protected.

async listen(*multiaddrs: Multiaddr) bool
Parameters:

multiaddrs – one or many multiaddrs to start listening on

Returns:

true if at least one success

For each multiaddr

  • Check if a listener for multiaddr exists already

  • If listener already exists, continue

  • Otherwise:

    • Capture multiaddr in conn handler

    • Have conn handler delegate to stream handler

    • Call listener listen with the multiaddr

    • Map multiaddr to listener

listeners: dict[str, IListener]
async new_stream(peer_id: ID) INetStream

Enhanced: Create a new stream with load balancing across multiple connections.

When a stream semaphore is configured (via set_resource_manager), this method awaits an available slot instead of raising immediately when the stream limit is reached.

Parameters:

peer_id – peer_id of destination

Raises:

SwarmException – raised when an error occurs

Returns:

net stream instance

notifees: list[INotifee]
async notify_all(notifier: Callable[[INotifee], Awaitable[None]]) None
async notify_closed_stream(stream: INetStream) None
async notify_connected(conn: INetConn) None
async notify_disconnected(conn: INetConn) None
async notify_listen(multiaddr: Multiaddr) None
async notify_listen_close(multiaddr: Multiaddr) None
async notify_opened_stream(stream: INetStream) None
peerstore: IPeerStore
protect(peer_id: ID, tag: str) None

Protect a peer from having its connection(s) pruned.

Protected peers will never be disconnected during connection pruning, regardless of their tag values.

Parameters

peer_idID

The peer to protect.

tagstr

Protection tag (different components can use different tags).

register_notifee(notifee: INotifee) None
Parameters:

notifee – object implementing Notifee interface

Returns:

true if notifee registered successfully, false otherwise

remove_conn(swarm_conn: SwarmConn) None

Simply remove the connection from Swarm’s records, without closing the connection.

retry_config: RetryConfig
async run() None

Main service logic - implemented by subclasses.

self_id: ID
set_resource_manager(resource_manager: ResourceManager | None, enable_stream_semaphore: bool = True) None

Attach a ResourceManager to wire connection/stream scopes.

set_stream_handler(stream_handler: Callable[[object], Awaitable[None]]) None

Set the stream handler for incoming streams.

Parameters

stream_handlerStreamHandlerFn

The handler function to process incoming streams.

tag_peer(peer_id: ID, tag: str, value: int) None

Tag a peer with a string, associating a weight with the tag.

Tags are used for connection management decisions. Peers with higher total tag values are less likely to have their connections pruned.

Parameters

peer_idID

The peer to tag.

tagstr

The tag name.

valueint

The weight/value associated with the tag.

tag_store: TagStore
transport: ITransport
unprotect(peer_id: ID, tag: str) bool

Remove a protection that may have been placed on a peer.

Parameters

peer_idID

The peer to unprotect.

tagstr

The protection tag to remove.

Returns

bool

True if the peer is still protected by other tags, False otherwise.

untag_peer(peer_id: ID, tag: str) None

Remove the tagged value from the peer.

Parameters

peer_idID

The peer to untag.

tagstr

The tag name to remove.

async upgrade_inbound_raw_conn(raw_conn: IRawConnection, maddr: Multiaddr) IMuxedConn

Secure the inbound raw connection and upgrade it to a multiplexed connection.

Parameters:

raw_conn – the inbound raw connection to upgrade

Raises:

SwarmException – raised when security or muxer upgrade fails

Returns:

network connection with security and multiplexing established

async upgrade_outbound_raw_conn(raw_conn: IRawConnection, peer_id: ID, pre_scope: Any = None) SwarmConn

Secure the outgoing raw connection and upgrade it to a multiplexed connection.

Parameters:
  • raw_conn – the raw connection to upgrade

  • peer_id – the peer this connection is to

  • pre_scope – pre-upgrade resource scope (if any)

Raises:

SwarmException – raised when security or muxer upgrade fails

Returns:

network connection with security and multiplexing established

upgrader: TransportUpgrader
libp2p.network.swarm.create_default_stream_handler(network: INetworkService) Callable[[object], Awaitable[None]]

Module contents

Network layer for libp2p.

This package provides connection management, dial queue, reconnection, rate limiting, address management, DNS resolution, and tagging functionality.

class libp2p.network.AutoConnector(swarm: Swarm, auto_connect_interval: float = 30.0)

Bases: object

Auto-connector that maintains minimum connection count.

Periodically checks if the connection count is below the low watermark and attempts to connect to known peers from the peer store.

Similar to go-libp2p’s connection manager background dialer.

clear_all_cooldowns() None

Clear all peer cooldowns.

clear_cooldown(peer_id: ID) None

Clear the cooldown for a specific peer.

Parameters

peer_idID

The peer to clear cooldown for

async maybe_connect() None

Check if we should connect to more peers and do so if needed.

Called periodically by the background task, or can be called manually when a peer disconnects.

record_failed_connection(peer_id: ID) None

Record a failed connection attempt.

Updates the last attempt time for cooldown purposes.

Parameters

peer_idID

The peer we failed to connect to

record_successful_connection(peer_id: ID) None

Record a successful connection to a peer.

Clears the cooldown for this peer.

Parameters

peer_idID

The peer that we connected to

async run_background_task(nursery: Nursery) None

Run the background task that periodically checks connection count.

Parameters

nurserytrio.Nursery

The nursery to run tasks in

async start() None

Start the auto-connector background task.

async stop() None

Stop the auto-connector.

class libp2p.network.CommonTags

Bases: object

Common tag names used throughout libp2p.

ACTIVE_STREAMS = 'active-streams'
APPLICATION = 'application'
BITSWAP = 'bitswap'
BOOTSTRAP = 'bootstrap'
DHT = 'dht'
KEEP_ALIVE = 'keep-alive'
PUBSUB = 'pubsub'
RELAY = 'relay'
class libp2p.network.ConnectionPruner(swarm: Swarm, allow_list: list[str] | list[Multiaddr] | None = None)

Bases: object

Connection pruner that selects connections to close when limits are exceeded.

Sorts connections by multiple criteria to determine which connections should be closed first when the connection limit is exceeded.

async maybe_prune_connections() None

Check if connections need to be pruned and prune if necessary.

Triggered when a new connection is opened or periodically. Uses high_watermark as the trigger point for pruning.

sort_connections(connections: list[INetConn], peer_values: dict[ID, int]) list[INetConn]

Sort connections for pruning priority.

Connections are sorted by (in order): 1. Peer tag value (lowest first) 2. Stream count (lowest first) 3. Direction (inbound first, then outbound) - TODO: when direction is available 4. Connection age (oldest first)

Parameters

connectionslist[INetConn]

List of connections to sort

peer_valuesdict[ID, int]

Mapping of peer IDs to their tag values

Returns

list[INetConn]

Sorted list of connections (first = lowest priority, should be pruned first)

async start() None

Start the connection pruner.

async stop() None

Stop the connection pruner.

class libp2p.network.TagInfo(first_seen: float = <factory>, value: int = 0, tags: dict[str, int] = <factory>, conns: dict[str, float] = <factory>)

Bases: object

Metadata associated with a peer for connection management.

This matches the go-libp2p TagInfo struct.

Attributes

first_seenfloat

Unix timestamp when this peer was first seen.

valueint

Total aggregated tag value for this peer.

tagsdict[str, int]

Individual tag names mapped to their values.

connsdict[str, float]

Connection identifiers mapped to their creation time.

conns: dict[str, float]
first_seen: float
get_total_value() int

Calculate total value from all tags.

Returns

int

Sum of all tag values.

tags: dict[str, int]
to_dict() dict[str, Any]

Convert TagInfo to dictionary for serialization.

Returns

dict

Dictionary representation of the tag info.

value: int = 0
class libp2p.network.TagStore

Bases: object

Thread-safe store for peer tags.

The TagStore manages tags for peers, allowing different components to assign importance values that influence connection pruning.

Similar to go-libp2p’s ConnManager tagging functionality.

clear_peer(peer_id: ID) None

Clear all tag data for a peer.

Parameters

peer_idID

The peer to clear.

get_all_peers() list[ID]

Get all peers with tags.

Returns

list[ID]

List of all peer IDs with tags.

get_protected_peers() list[ID]

Get all protected peers.

Returns

list[ID]

List of all protected peer IDs.

get_tag(peer_id: ID, tag: str) int

Get a specific tag value for a peer.

Parameters

peer_idID

The peer to get tag for.

tagstr

The tag name.

Returns

int

The tag value, or 0 if not found.

get_tag_info(peer_id: ID) TagInfo | None

Get the metadata associated with a peer.

Parameters

peer_idID

The peer to get info for.

Returns

TagInfo | None

The tag info for the peer, or None if no tags recorded.

get_tag_value(peer_id: ID) int

Get the total tag value for a peer.

Parameters

peer_idID

The peer to get value for.

Returns

int

Total tag value (sum of all tags), or 0 if not found.

is_protected(peer_id: ID, tag: str = '') bool

Check if a peer is protected.

Parameters

peer_idID

The peer to check.

tagstr

If provided, check if protected by this specific tag. If empty string, check if protected by any tag.

Returns

bool

True if the peer is protected.

protect(peer_id: ID, tag: str) None

Protect a peer from having its connection(s) pruned.

Tagging allows different parts of the system to manage protections without interfering with one another.

Calls to Protect() with the same tag are idempotent.

Parameters

peer_idID

The peer to protect.

tagstr

Protection tag (different components can use different tags).

record_connection(peer_id: ID, conn_id: str) None

Record a new connection for a peer.

Parameters

peer_idID

The peer that connected.

conn_idstr

Identifier for the connection (e.g., remote multiaddr).

remove_connection(peer_id: ID, conn_id: str) None

Remove a connection record for a peer.

Parameters

peer_idID

The peer that disconnected.

conn_idstr

Identifier for the connection.

tag_peer(peer_id: ID, tag: str, value: int) None

Tag a peer with a string, associating a weight with the tag.

If the tag already exists, it will be overwritten with the new value.

Parameters

peer_idID

The peer to tag.

tagstr

The tag name.

valueint

The weight/value associated with the tag.

unprotect(peer_id: ID, tag: str) bool

Remove a protection that may have been placed on a peer.

Parameters

peer_idID

The peer to unprotect.

tagstr

The protection tag to remove.

Returns

bool

True if the peer is still protected by other tags, False otherwise.

untag_peer(peer_id: ID, tag: str) None

Remove the tagged value from the peer.

Parameters

peer_idID

The peer to untag.

tagstr

The tag name to remove.

upsert_tag(peer_id: ID, tag: str, upsert_fn: Callable[[int], int]) None

Update an existing tag or insert a new one.

The upsert function is called with the current value of the tag (or zero if it doesn’t exist). The return value becomes the new value of the tag.

Parameters

peer_idID

The peer to tag.

tagstr

The tag name.

upsert_fnCallable[[int], int]

Function that takes current value and returns new value.

libp2p.network.upsert_add(delta: int) Callable[[int], int]

Create an upsert function that adds a delta to the current value.

Parameters

deltaint

The value to add.

Returns

Callable[[int], int]

Upsert function.

libp2p.network.upsert_bounded(delta: int, min_val: int, max_val: int) Callable[[int], int]

Create an upsert function that adds a delta but keeps within bounds.

Parameters

deltaint

The value to add.

min_valint

Minimum allowed value.

max_valint

Maximum allowed value.

Returns

Callable[[int], int]

Upsert function.

libp2p.network.upsert_set(value: int) Callable[[int], int]

Create an upsert function that sets a specific value.

Parameters

valueint

The value to set.

Returns

Callable[[int], int]

Upsert function.