libp2p.pubsub package

Py-libp2p provides a comprehensive PubSub implementation with support for both FloodSub and GossipSub protocols, including the latest GossipSub 1.2 specification with IDONTWANT control messages for improved bandwidth efficiency.

For detailed information about GossipSub 1.2 features and configuration, see GossipSub 1.2 Protocol Support.

Subpackages

Submodules

libp2p.pubsub.exceptions module

exception libp2p.pubsub.exceptions.NoPubsubAttached

Bases: PubsubRouterError

exception libp2p.pubsub.exceptions.PubsubRouterError

Bases: BaseLibp2pError

libp2p.pubsub.floodsub module

class libp2p.pubsub.floodsub.FloodSub(protocols: Sequence[TProtocol])

Bases: IPubsubRouter

add_peer(peer_id: ID, protocol_id: TProtocol | None) None

Notifies the router that a new peer has been connected.

Parameters:

peer_id – id of peer to add

attach(pubsub: Pubsub) None

Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance.

Parameters:

pubsub – pubsub instance to attach to

get_protocols() list[TProtocol]
Returns:

the list of protocols supported by the router

async handle_rpc(rpc: RPC, sender_peer_id: ID) None

Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed

Parameters:
  • rpc – RPC message

  • sender_peer_id – id of the peer who sent the message

async join(topic: str) None

Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the subscription announcement.

Parameters:

topic – topic to join

async leave(topic: str) None

Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement.

Parameters:

topic – topic to leave

mesh: dict[str, set[ID]]
peer_protocol: dict[ID, TProtocol]
protocols: list[TProtocol]
async publish(msg_forwarder: ID, pubsub_msg: Message) None

Invoked to forward a new message that has been validated. This is where the “flooding” part of floodsub happens.

With flooding, routing is almost trivial: for each incoming message, forward to all known peers in the topic. There is a bit of logic, as the router maintains a timed cache of previous messages, so that seen messages are not further forwarded. It also never forwards a message back to the source or the peer that forwarded the message. :param msg_forwarder: peer ID of the peer who forwards the message to us :param pubsub_msg: pubsub message in protobuf.

pubsub: Pubsub | None
remove_peer(peer_id: ID) None

Notifies the router that a peer has been disconnected.

Parameters:

peer_id – id of peer to remove

time_since_last_publish: dict[str, int]

libp2p.pubsub.gossipsub module

class libp2p.pubsub.gossipsub.GossipSub(protocols: Sequence[TProtocol], degree: int, degree_low: int, degree_high: int, direct_peers: Sequence[PeerInfo] | None = None, time_to_live: int = 60, gossip_window: int = 3, gossip_history: int = 5, heartbeat_initial_delay: float = 0.1, heartbeat_interval: int = 120, direct_connect_initial_delay: float = 0.1, direct_connect_interval: int = 300, do_px: bool = False, px_peers_count: int = 16, prune_back_off: int = 60, unsubscribe_back_off: int = 10, score_params: ScoreParams | None = None, max_idontwant_messages: int = 10, adaptive_gossip_enabled: bool = True, spam_protection_enabled: bool = True, max_messages_per_topic_per_second: float = 10.0, eclipse_protection_enabled: bool = True, min_mesh_diversity_ips: int = 3)

Bases: IPubsubRouter, Service

adaptive_degree_high: int
adaptive_degree_low: int
adaptive_gossip_enabled: bool
add_peer(peer_id: ID, protocol_id: TProtocol | None) None

Notifies the router that a new peer has been connected.

Parameters:
  • peer_id – id of peer to add

  • protocol_id – router protocol the peer speaks, e.g., floodsub, gossipsub

attach(pubsub: Pubsub) None

Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance.

Parameters:

pubsub – pubsub instance to attach to

back_off: dict[str, dict[ID, int]]
degree: int
degree_high: int
degree_low: int
async direct_connect_heartbeat() None

Connect to direct peers.

direct_connect_initial_delay: float
direct_connect_interval: int
direct_peers: dict[ID, PeerInfo]
do_px: bool
dont_send_message_ids: dict[ID, set[bytes]]
eclipse_protection_enabled: bool
async emit_control_message(control_msg: ControlMessage, to_peer: ID) None
async emit_graft(topic: str, id: ID) None

Emit graft message, sent to to_peer, for topic.

async emit_idontwant(msg_ids: list[bytes], to_peer: ID) None

Emit idontwant message, sent to to_peer, for msg_ids.

async emit_ihave(topic: str, msg_ids: Any, to_peer: ID) None

Emit ihave message, sent to to_peer, for topic and msg_ids.

async emit_iwant(msg_ids: Any, to_peer: ID) None

Emit iwant message, sent to to_peer, for msg_ids.

async emit_prune(topic: str, to_peer: ID, do_px: bool, is_unsubscribe: bool) None

Emit graft message, sent to to_peer, for topic.

equivocation_detection: dict[tuple[bytes, bytes], Message]
fanout: dict[str, set[ID]]
fanout_heartbeat() None

Maintain fanout topics by: 1. Removing expired topics 2. Removing peers that are no longer in the topic 3. Adding new peers if needed to maintain the target degree

get_protocols() list[TProtocol]
Returns:

the list of protocols supported by the router

gossip_factor: float
gossip_heartbeat() DefaultDict[ID, dict[str, list[str]]]
async handle_graft(graft_msg: ControlGraft, sender_peer_id: ID) None
async handle_idontwant(idontwant_msg: ControlIDontWant, sender_peer_id: ID) None

Handle incoming IDONTWANT control message by adding message IDs to the peer’s dont_send_message_ids set.

This method enforces max_idontwant_messages limit to prevent memory exhaustion from peers sending excessive IDONTWANT messages. When the limit is reached, older entries may be dropped to make room for new ones.

Parameters:
  • idontwant_msg – The IDONTWANT control message

  • sender_peer_id – ID of the peer who sent the message

async handle_ihave(ihave_msg: ControlIHave, sender_peer_id: ID) None

Checks the seen set and requests unknown messages with an IWANT message.

async handle_iwant(iwant_msg: ControlIWant, sender_peer_id: ID) None

Forwards all request messages that are present in mcache to the requesting peer.

async handle_prune(prune_msg: ControlPrune, sender_peer_id: ID) None
async handle_rpc(rpc: RPC, sender_peer_id: ID) None

Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed

Parameters:
  • rpc – RPC message

  • sender_peer_id – id of the peer who sent the message

async heartbeat() None

Call individual heartbeats.

Note: the heartbeats are called with awaits because each heartbeat depends on the state changes in the preceding heartbeat

heartbeat_initial_delay: float
heartbeat_interval: int
async join(topic: str) None

Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the subscription announcement.

Parameters:

topic – topic to join

last_health_update: int
async leave(topic: str) None

Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement.

Parameters:

topic – topic to leave

max_idontwant_messages: int
max_messages_per_topic_per_second: float
mcache: MessageCache
mesh: dict[str, set[ID]]
mesh_heartbeat() tuple[DefaultDict[ID, list[str]], DefaultDict[ID, list[str]]]
message_rate_limits: dict[ID, dict[str, list[float]]]
min_mesh_diversity_ips: int
network_health_score: float
pack_control_msgs(ihave_msgs: list[ControlIHave] | None, graft_msgs: list[ControlGraft] | None, prune_msgs: list[ControlPrune] | None, idontwant_msgs: list[ControlIDontWant] | None = None) ControlMessage
peer_protocol: dict[ID, TProtocol]
protocols: list[TProtocol]
prune_back_off: int
async publish(msg_forwarder: ID, pubsub_msg: Message) None

Invoked to forward a new message that has been validated.

pubsub: Pubsub | None
px_peers_count: int
remove_peer(peer_id: ID) None

Notifies the router that a peer has been disconnected.

Parameters:

peer_id – id of peer to remove

async run() None

Primary entry point for all service logic.

Note

This method should not be directly invoked by user code.

Services may be run using the following approaches.

static select_from_minus(num_to_select: int, pool: Iterable[Any], minus: Iterable[Any]) list[Any]

Select at most num_to_select subset of elements from the set (pool - minus) randomly. :param num_to_select: number of elements to randomly select :param pool: list of items to select from (excluding elements in minus) :param minus: elements to be excluded from selection pool :return: list of selected elements

spam_protection_enabled: bool
supports_scoring(peer_id: ID) bool

Check if peer supports Gossipsub v1.1+ scoring features.

Parameters:

peer_id – The peer to check

Returns:

True if peer supports v1.1+ features, False otherwise

supports_v20_features(peer_id: ID) bool

Check if peer supports Gossipsub v2.0 features.

Parameters:

peer_id – The peer to check

Returns:

True if peer supports v2.0 features, False otherwise

time_since_last_publish: dict[str, int]
time_to_live: int
unsubscribe_back_off: int

libp2p.pubsub.mcache module

class libp2p.pubsub.mcache.CacheEntry(mid: tuple[bytes, bytes], topics: Sequence[str])

Bases: object

mid: tuple[bytes, bytes]
topics: list[str]

A logical representation of an entry in the mcache’s _history_.

class libp2p.pubsub.mcache.MessageCache(window_size: int, history_size: int)

Bases: object

get(mid: tuple[bytes, bytes]) Message | None

Get a message from the mcache.

Parameters:

mid – (seqno, from_id) of the message to get.

Returns:

The rpc message associated with this mid

history: list[list[CacheEntry]]
history_size: int
msgs: dict[tuple[bytes, bytes], Message]
put(msg: Message) None

Put a message into the mcache.

Parameters:

msg – The rpc message to put in. Should contain seqno and from_id

shift() None

Shift the window over by 1 position, dropping the last element of the history.

window(topic: str) list[tuple[bytes, bytes]]

Get the window for this topic.

Parameters:

topic – Topic whose message ids we desire.

Returns:

List of mids in the current window.

window_size: int

libp2p.pubsub.pubsub module

class libp2p.pubsub.pubsub.Pubsub(host: ~libp2p.abc.IHost, router: ~libp2p.abc.IPubsubRouter, cache_size: int | None = None, seen_ttl: int = 120, sweep_interval: int = 60, strict_signing: bool = True, msg_id_constructor: ~collections.abc.Callable[[~libp2p.pubsub.pb.rpc_pb2.Message], bytes] = <function get_peer_and_seqno_msg_id>, max_concurrent_validator_count: int = 10, validation_cache_ttl: int = 300, validation_cache_size: int = 1000, validation_timeout: float = 5.0)

Bases: Service, IPubsub

add_to_blacklist(peer_id: ID) None

Add a peer to the blacklist. When a peer is blacklisted: - Any existing connection to that peer is immediately closed and removed - The peer is removed from all topic subscription mappings - Future connection attempts from this peer will be rejected - Messages forwarded by or originating from this peer will be dropped - The peer will not be able to participate in pubsub communication

Parameters:

peer_id – the peer ID to blacklist

blacklisted_peers: set[ID]
clear_blacklist() None

Clear all peers from the blacklist. This removes all blacklist restrictions, allowing previously blacklisted peers to: - Establish new connections - Send and forward messages - Participate in topic subscriptions

async continuously_read_stream(stream: INetStream) None

Read from input stream in an infinite loop. Process messages from other nodes.

Parameters:

stream – stream to continously read from

counter: int
dead_peer_receive_channel: trio.MemoryReceiveChannel[ID]
event_handle_dead_peer_queue_started: trio.Event
event_handle_peer_queue_started: trio.Event
get_blacklisted_peers() set[ID]

Get a copy of the current blacklisted peers. Returns a snapshot of all currently blacklisted peer IDs. These peers are completely isolated from pubsub communication - their connections are rejected and their messages are dropped.

Returns:

a set containing all blacklisted peer IDs

get_hello_packet() RPC

Generate subscription message with all topics we are subscribed to only send hello packet if we have subscribed topics.

get_message_id(msg: Message) bytes

Get the message ID for a given message using the configured message ID constructor.

This method provides a public interface for external components (like routers) to access message ID construction functionality.

Parameters:

msg – the message to get the ID for

Returns:

the message ID as bytes

get_msg_validators(msg: Message) tuple[TopicValidator, ...]

Get all validators corresponding to the topics in the message.

Parameters:

msg – the message published to the topic

async handle_dead_peer_queue() None

Continuously read from dead peer channel and close the stream between that peer and remove peer info from pubsub and pubsub router. Only removes the peer if there are no remaining active connections.

async handle_peer_queue() None

Continuously read from peer queue and each time a new peer is found, open a stream to the peer using a supported pubsub protocol pubsub protocols we support.

handle_subscription(origin_id: ID, sub_message: SubOpts) None

Handle an incoming subscription message from a peer. Update internal mapping to mark the peer as subscribed or unsubscribed to topics as defined in the subscription message.

Parameters:
  • origin_id – id of the peer who subscribe to the message

  • sub_message – RPC.SubOpts

host: IHost
is_peer_blacklisted(peer_id: ID) bool

Check if a peer is blacklisted.

Parameters:

peer_id – the peer ID to check

Returns:

True if peer is blacklisted, False otherwise

async message_all_peers(raw_msg: bytes) None

Broadcast a message to peers.

Parameters:

raw_msg – raw contents of the message to broadcast

property my_id: ID

Retrieve the identifier for this pubsub instance.

Returns

ID

The pubsub identifier.

notify_subscriptions(publish_message: Message) None

Put incoming message from a peer onto my blocking queue.

Parameters:

publish_message – RPC.Message format

peer_receive_channel: trio.MemoryReceiveChannel[ID]
peer_topics: dict[str, set[ID]]
peers: dict[ID, INetStream]
property protocols: tuple[TProtocol, ...]

Retrieve the protocols used by the pubsub system.

Returns

tuple[TProtocol, …]

A tuple of protocol identifiers.

async publish(topic_id: str | list[str], data: bytes) None

Publish data to a topic or multiple topics.

Parameters:
  • topic_id – topic (str) or topics (list[str]) to publish the data to

  • data – data which we are publishing

async push_msg(msg_forwarder: ID, msg: Message) None

Push a pubsub message to others.

Parameters:
  • msg_forwarder – the peer who forward us the message.

  • msg – the message we are going to push out.

remove_from_blacklist(peer_id: ID) None

Remove a peer from the blacklist. Once removed from the blacklist: - The peer can establish new connections to this node - Messages from this peer will be processed normally - The peer can participate in topic subscriptions and message forwarding

Parameters:

peer_id – the peer ID to remove from blacklist

remove_topic_validator(topic: str) None

Remove the validator from the given topic.

Parameters:

topic – the topic to remove validator from

router: IPubsubRouter
async run() None

Primary entry point for all service logic.

Note

This method should not be directly invoked by user code.

Services may be run using the following approaches.

seen_messages: LastSeenCache
set_topic_validator(topic: str, validator: Callable[[ID, Message], bool] | Callable[[ID, Message], Awaitable[bool]], is_async_validator: bool) None

Register a validator under the given topic. One topic can only have one validtor.

Parameters:
  • topic – the topic to register validator under

  • validator – the validator used to validate messages published to the topic

  • is_async_validator – indicate if the validator is an asynchronous validator

sign_key: PrivateKey | None
async stream_handler(stream: INetStream) None

Stream handler for pubsub. Gets invoked whenever a new stream is created on one of the supported pubsub protocols.

Parameters:

stream – newly created stream

strict_signing: bool
async subscribe(topic_id: str) ISubscriptionAPI

Subscribe ourself to a topic.

Parameters:

topic_id – topic_id to subscribe to

subscribed_topics_receive: dict[str, TrioSubscriptionAPI]
subscribed_topics_send: dict[str, trio.MemorySendChannel[rpc_pb2.Message]]
property topic_ids: KeysView[str]

Retrieve the set of topic identifiers.

Returns

KeysView[str]

A view of the topic identifiers.

topic_validators: dict[str, TopicValidator]
async unsubscribe(topic_id: str) None

Unsubscribe ourself from a topic.

Parameters:

topic_id – topic_id to unsubscribe from

async validate_msg(msg_forwarder: ID, msg: Message) None

Validate the received message with caching and timeout support.

Parameters:
  • msg_forwarder – the peer who forward us the message.

  • msg – the message.

validation_cache: ValidationCache
validation_timeout: float
async wait_until_ready() None

Wait until the pubsub system is fully initialized and ready.

async write_msg(stream: INetStream, rpc_msg: RPC) bool

Write an RPC message to a stream with proper error handling.

Implements WriteMsg similar to go-msgio which is used in go-libp2p Ref: https://github.com/libp2p/go-msgio/blob/master/protoio/uvarint_writer.go#L56

Parameters:
  • stream – stream to write the message to

  • rpc_msg – RPC message to write

Returns:

True if successful, False if stream was closed (StreamClosed) or reset (StreamReset)

class libp2p.pubsub.pubsub.TopicValidator(validator, is_async)

Bases: NamedTuple

is_async: bool

Alias for field number 1

validator: Callable[[ID, Message], bool] | Callable[[ID, Message], Awaitable[bool]]

Alias for field number 0

class libp2p.pubsub.pubsub.ValidationCache(ttl: int = 300, max_size: int = 1000)

Bases: object

Cache for validation results to avoid redundant validation.

clear_expired() None

Clear expired entries from cache.

get(msg_id: bytes) ValidationResult | None

Get cached validation result if still valid.

put(msg_id: bytes, result: ValidationResult) None

Cache a validation result.

class libp2p.pubsub.pubsub.ValidationResult(is_valid: bool, timestamp: float, error_message: str | None = None)

Bases: NamedTuple

Result of message validation with caching metadata.

error_message: str | None

Alias for field number 2

is_valid: bool

Alias for field number 0

timestamp: float

Alias for field number 1

libp2p.pubsub.pubsub.get_content_addressed_msg_id(msg: Message, encoding: str | None = None) bytes

Generate content-addressed message ID using multibase encoding.

Parameters:
  • msg – Pubsub message

  • encoding – Encoding to use. When None the process-wide default from libp2p.encoding_config is used.

Returns:

Multibase-encoded message ID

libp2p.pubsub.pubsub.get_peer_and_seqno_msg_id(msg: Message) bytes

libp2p.pubsub.pubsub_notifee module

class libp2p.pubsub.pubsub_notifee.PubsubNotifee(initiator_peers_queue: trio.MemorySendChannel[ID], dead_peers_queue: trio.MemorySendChannel[ID])

Bases: INotifee

async closed_stream(network: INetwork, stream: INetStream) None

Called when a stream is closed.

Parameters

networkINetwork

The network instance on which the stream was closed.

streamINetStream

The stream that was closed.

async connected(network: INetwork, conn: INetConn) None

Add peer_id to initiator_peers_queue, so that this peer_id can be used to create a stream and we only want to have one pubsub stream with each peer.

Parameters:
  • network – network the connection was opened on

  • conn – connection that was opened

dead_peers_queue: trio.MemorySendChannel[ID]
async disconnected(network: INetwork, conn: INetConn) None

Add peer_id to dead_peers_queue, so that pubsub and its router can remove this peer_id and close the stream inbetween.

Parameters:
  • network – network the connection was opened on

  • conn – connection that was opened

initiator_peers_queue: trio.MemorySendChannel[ID]
async listen(network: INetwork, multiaddr: Multiaddr) None

Called when a listener starts on a multiaddress.

Parameters

networkINetwork

The network instance where the listener is active.

multiaddrMultiaddr

The multiaddress on which the listener is listening.

async listen_close(network: INetwork, multiaddr: Multiaddr) None

Called when a listener stops listening on a multiaddress.

Parameters

networkINetwork

The network instance where the listener was active.

multiaddrMultiaddr

The multiaddress that is no longer being listened on.

async opened_stream(network: INetwork, stream: INetStream) None

Called when a new stream is opened.

Parameters

networkINetwork

The network instance on which the stream was opened.

streamINetStream

The stream that was opened.

libp2p.pubsub.subscription module

class libp2p.pubsub.subscription.BaseSubscriptionAPI

Bases: ISubscriptionAPI

class libp2p.pubsub.subscription.TrioSubscriptionAPI(receive_channel: MemoryReceiveChannel[Message], unsubscribe_fn: Callable[[], Awaitable[None]])

Bases: BaseSubscriptionAPI

async get() Message

Retrieve the next message from the subscription.

Returns

rpc_pb2.Message

The next pubsub message.

receive_channel: MemoryReceiveChannel[Message]
async unsubscribe() None

Unsubscribe from the current topic.

unsubscribe_fn: Callable[[], Awaitable[None]]

libp2p.pubsub.validators module

libp2p.pubsub.validators.signature_validator(msg: Message) bool

Verify the message against the given public key.

Parameters:
  • pubkey – the public key which signs the message.

  • msg – the message signed.

Module contents