libp2p.pubsub package

Subpackages

Submodules

libp2p.pubsub.abc module

class libp2p.pubsub.abc.IPubsub

Bases: ServiceAPI

abstract property my_id: ID
abstract property protocols: Tuple[TProtocol, ...]
abstract async publish(topic_id: str, data: bytes) None
abstract remove_topic_validator(topic: str) None
abstract set_topic_validator(topic: str, validator: Callable[[ID, Message], bool] | Callable[[ID, Message], Awaitable[bool]], is_async_validator: bool) None
abstract async subscribe(topic_id: str) ISubscriptionAPI
abstract property topic_ids: KeysView[str]
abstract async unsubscribe(topic_id: str) None
abstract async wait_until_ready() None
class libp2p.pubsub.abc.IPubsubRouter

Bases: ABC

abstract add_peer(peer_id: ID, protocol_id: TProtocol) None

Notifies the router that a new peer has been connected.

Parameters:

peer_id – id of peer to add

abstract attach(pubsub: Pubsub) None

Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance.

Parameters:

pubsub – pubsub instance to attach to

abstract get_protocols() List[TProtocol]
Returns:

the list of protocols supported by the router

abstract async handle_rpc(rpc: RPC, sender_peer_id: ID) None

Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed TODO: Check if this interface is ok. It’s not the exact same as the go code, but the go code is really confusing with the msg origin, they specify rpc.from even when the rpc shouldn’t have a from :param rpc: rpc message

abstract async join(topic: str) None

Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the subscription announcement.

Parameters:

topic – topic to join

abstract async leave(topic: str) None

Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement.

Parameters:

topic – topic to leave

abstract async publish(msg_forwarder: ID, pubsub_msg: Message) None

Invoked to forward a new message that has been validated.

Parameters:
  • msg_forwarder – peer_id of message sender

  • pubsub_msg – pubsub message to forward

abstract remove_peer(peer_id: ID) None

Notifies the router that a peer has been disconnected.

Parameters:

peer_id – id of peer to remove

class libp2p.pubsub.abc.ISubscriptionAPI

Bases: AsyncContextManager[ISubscriptionAPI], AsyncIterable[Message]

abstract async get() Message
abstract async unsubscribe() None

libp2p.pubsub.exceptions module

exception libp2p.pubsub.exceptions.NoPubsubAttached

Bases: PubsubRouterError

exception libp2p.pubsub.exceptions.PubsubRouterError

Bases: BaseLibp2pError

libp2p.pubsub.floodsub module

class libp2p.pubsub.floodsub.FloodSub(protocols: Sequence[TProtocol])

Bases: IPubsubRouter

add_peer(peer_id: ID, protocol_id: TProtocol) None

Notifies the router that a new peer has been connected.

Parameters:

peer_id – id of peer to add

attach(pubsub: Pubsub) None

Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance.

Parameters:

pubsub – pubsub instance to attach to

get_protocols() List[TProtocol]
Returns:

the list of protocols supported by the router

async handle_rpc(rpc: RPC, sender_peer_id: ID) None

Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed.

Parameters:

rpc – rpc message

async join(topic: str) None

Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the subscription announcement.

Parameters:

topic – topic to join

async leave(topic: str) None

Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement.

Parameters:

topic – topic to leave

protocols: List[TProtocol]
async publish(msg_forwarder: ID, pubsub_msg: Message) None

Invoked to forward a new message that has been validated. This is where the “flooding” part of floodsub happens.

With flooding, routing is almost trivial: for each incoming message, forward to all known peers in the topic. There is a bit of logic, as the router maintains a timed cache of previous messages, so that seen messages are not further forwarded. It also never forwards a message back to the source or the peer that forwarded the message. :param msg_forwarder: peer ID of the peer who forwards the message to us :param pubsub_msg: pubsub message in protobuf.

pubsub: Pubsub
remove_peer(peer_id: ID) None

Notifies the router that a peer has been disconnected.

Parameters:

peer_id – id of peer to remove

libp2p.pubsub.gossipsub module

class libp2p.pubsub.gossipsub.GossipSub(protocols: Sequence[TProtocol], degree: int, degree_low: int, degree_high: int, time_to_live: int, gossip_window: int = 3, gossip_history: int = 5, heartbeat_initial_delay: float = 0.1, heartbeat_interval: int = 120)

Bases: IPubsubRouter, Service

add_peer(peer_id: ID, protocol_id: TProtocol) None

Notifies the router that a new peer has been connected.

Parameters:
  • peer_id – id of peer to add

  • protocol_id – router protocol the peer speaks, e.g., floodsub, gossipsub

attach(pubsub: Pubsub) None

Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance.

Parameters:

pubsub – pubsub instance to attach to

degree: int
degree_high: int
degree_low: int
async emit_control_message(control_msg: ControlMessage, to_peer: ID) None
async emit_graft(topic: str, to_peer: ID) None

Emit graft message, sent to to_peer, for topic.

async emit_ihave(topic: str, msg_ids: Any, to_peer: ID) None

Emit ihave message, sent to to_peer, for topic and msg_ids.

async emit_iwant(msg_ids: Any, to_peer: ID) None

Emit iwant message, sent to to_peer, for msg_ids.

async emit_prune(topic: str, to_peer: ID) None

Emit graft message, sent to to_peer, for topic.

fanout: Dict[str, Set[ID]]
fanout_heartbeat() None
get_protocols() List[TProtocol]
Returns:

the list of protocols supported by the router

gossip_heartbeat() DefaultDict[ID, Dict[str, List[str]]]
async handle_graft(graft_msg: ControlGraft, sender_peer_id: ID) None
async handle_ihave(ihave_msg: ControlIHave, sender_peer_id: ID) None

Checks the seen set and requests unknown messages with an IWANT message.

async handle_iwant(iwant_msg: ControlIWant, sender_peer_id: ID) None

Forwards all request messages that are present in mcache to the requesting peer.

async handle_prune(prune_msg: ControlPrune, sender_peer_id: ID) None
async handle_rpc(rpc: RPC, sender_peer_id: ID) None

Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed.

Parameters:
  • rpc – RPC message

  • sender_peer_id – id of the peer who sent the message

async heartbeat() None

Call individual heartbeats.

Note: the heartbeats are called with awaits because each heartbeat depends on the state changes in the preceding heartbeat

heartbeat_initial_delay: float
heartbeat_interval: int
async join(topic: str) None

Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the subscription announcement.

Parameters:

topic – topic to join

async leave(topic: str) None

Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement.

Parameters:

topic – topic to leave

mcache: MessageCache
mesh: Dict[str, Set[ID]]
mesh_heartbeat() Tuple[DefaultDict[ID, List[str]], DefaultDict[ID, List[str]]]
pack_control_msgs(ihave_msgs: List[ControlIHave], graft_msgs: List[ControlGraft], prune_msgs: List[ControlPrune]) ControlMessage
peer_protocol: Dict[ID, TProtocol]
protocols: List[TProtocol]
async publish(msg_forwarder: ID, pubsub_msg: Message) None

Invoked to forward a new message that has been validated.

pubsub: Pubsub
remove_peer(peer_id: ID) None

Notifies the router that a peer has been disconnected.

Parameters:

peer_id – id of peer to remove

async run() None

Primary entry point for all service logic.

Note

This method should not be directly invoked by user code.

Services may be run using the following approaches.

static select_from_minus(num_to_select: int, pool: Iterable[Any], minus: Iterable[Any]) List[Any]

Select at most num_to_select subset of elements from the set (pool - minus) randomly. :param num_to_select: number of elements to randomly select :param pool: list of items to select from (excluding elements in minus) :param minus: elements to be excluded from selection pool :return: list of selected elements

time_to_live: int

libp2p.pubsub.mcache module

class libp2p.pubsub.mcache.CacheEntry(mid: Tuple[bytes, bytes], topics: Sequence[str])

Bases: object

mid: Tuple[bytes, bytes]
topics: List[str]

A logical representation of an entry in the mcache’s _history_.

class libp2p.pubsub.mcache.MessageCache(window_size: int, history_size: int)

Bases: object

get(mid: Tuple[bytes, bytes]) Message | None

Get a message from the mcache.

Parameters:

mid – (seqno, from_id) of the message to get.

Returns:

The rpc message associated with this mid

history: List[List[CacheEntry]]
history_size: int
msgs: Dict[Tuple[bytes, bytes], Message]
put(msg: Message) None

Put a message into the mcache.

Parameters:

msg – The rpc message to put in. Should contain seqno and from_id

shift() None

Shift the window over by 1 position, dropping the last element of the history.

window(topic: str) List[Tuple[bytes, bytes]]

Get the window for this topic.

Parameters:

topic – Topic whose message ids we desire.

Returns:

List of mids in the current window.

window_size: int

libp2p.pubsub.pubsub module

class libp2p.pubsub.pubsub.Pubsub(host: ~libp2p.host.host_interface.IHost, router: IPubsubRouter, cache_size: int = None, strict_signing: bool = True, msg_id_constructor: ~typing.Callable[[~libp2p.pubsub.pb.rpc_pb2.Message], bytes] = <function get_peer_and_seqno_msg_id>)

Bases: Service, IPubsub

async continuously_read_stream(stream: INetStream) None

Read from input stream in an infinite loop. Process messages from other nodes.

Parameters:

stream – stream to continously read from

counter: int
dead_peer_receive_channel: trio.MemoryReceiveChannel[ID]
event_handle_dead_peer_queue_started: Event
event_handle_peer_queue_started: Event
get_hello_packet() RPC

Generate subscription message with all topics we are subscribed to only send hello packet if we have subscribed topics.

get_msg_validators(msg: Message) Tuple[TopicValidator, ...]

Get all validators corresponding to the topics in the message.

Parameters:

msg – the message published to the topic

async handle_dead_peer_queue() None

Continuously read from dead peer channel and close the stream between that peer and remove peer info from pubsub and pubsub router.

async handle_peer_queue() None

Continuously read from peer queue and each time a new peer is found, open a stream to the peer using a supported pubsub protocol pubsub protocols we support.

handle_subscription(origin_id: ID, sub_message: SubOpts) None

Handle an incoming subscription message from a peer. Update internal mapping to mark the peer as subscribed or unsubscribed to topics as defined in the subscription message.

Parameters:
  • origin_id – id of the peer who subscribe to the message

  • sub_message – RPC.SubOpts

host: IHost
async message_all_peers(raw_msg: bytes) None

Broadcast a message to peers.

Parameters:

raw_msg – raw contents of the message to broadcast

property my_id: ID
notify_subscriptions(publish_message: Message) None

Put incoming message from a peer onto my blocking queue.

Parameters:

publish_message – RPC.Message format

peer_receive_channel: trio.MemoryReceiveChannel[ID]
peer_topics: Dict[str, Set[ID]]
peers: Dict[ID, INetStream]
property protocols: Tuple[TProtocol, ...]
async publish(topic_id: str, data: bytes) None

Publish data to a topic.

Parameters:
  • topic_id – topic which we are going to publish the data to

  • data – data which we are publishing

async push_msg(msg_forwarder: ID, msg: Message) None

Push a pubsub message to others.

Parameters:
  • msg_forwarder – the peer who forward us the message.

  • msg – the message we are going to push out.

remove_topic_validator(topic: str) None

Remove the validator from the given topic.

Parameters:

topic – the topic to remove validator from

router: IPubsubRouter
async run() None

Primary entry point for all service logic.

Note

This method should not be directly invoked by user code.

Services may be run using the following approaches.

seen_messages: LRU
set_topic_validator(topic: str, validator: Callable[[ID, Message], bool] | Callable[[ID, Message], Awaitable[bool]], is_async_validator: bool) None

Register a validator under the given topic. One topic can only have one validtor.

Parameters:
  • topic – the topic to register validator under

  • validator – the validator used to validate messages published to the topic

  • is_async_validator – indicate if the validator is an asynchronous validator

sign_key: PrivateKey
async stream_handler(stream: INetStream) None

Stream handler for pubsub. Gets invoked whenever a new stream is created on one of the supported pubsub protocols.

Parameters:

stream – newly created stream

strict_signing: bool
async subscribe(topic_id: str) ISubscriptionAPI

Subscribe ourself to a topic.

Parameters:

topic_id – topic_id to subscribe to

subscribed_topics_receive: Dict[str, TrioSubscriptionAPI]
subscribed_topics_send: Dict[str, trio.MemorySendChannel[rpc_pb2.Message]]
property topic_ids: KeysView[str]
topic_validators: Dict[str, TopicValidator]
async unsubscribe(topic_id: str) None

Unsubscribe ourself from a topic.

Parameters:

topic_id – topic_id to unsubscribe from

async validate_msg(msg_forwarder: ID, msg: Message) None

Validate the received message.

Parameters:
  • msg_forwarder – the peer who forward us the message.

  • msg – the message.

async wait_until_ready() None
class libp2p.pubsub.pubsub.TopicValidator(validator, is_async)

Bases: NamedTuple

is_async: bool

Alias for field number 1

validator: Callable[[ID, Message], bool] | Callable[[ID, Message], Awaitable[bool]]

Alias for field number 0

libp2p.pubsub.pubsub.get_content_addressed_msg_id(msg: Message) bytes
libp2p.pubsub.pubsub.get_peer_and_seqno_msg_id(msg: Message) bytes

libp2p.pubsub.pubsub_notifee module

class libp2p.pubsub.pubsub_notifee.PubsubNotifee(initiator_peers_queue: trio.MemorySendChannel[ID], dead_peers_queue: trio.MemorySendChannel[ID])

Bases: INotifee

async closed_stream(network: INetwork, stream: INetStream) None
Parameters:
  • network – network the stream was closed on

  • stream – stream that was closed

async connected(network: INetwork, conn: INetConn) None

Add peer_id to initiator_peers_queue, so that this peer_id can be used to create a stream and we only want to have one pubsub stream with each peer.

Parameters:
  • network – network the connection was opened on

  • conn – connection that was opened

dead_peers_queue: trio.MemorySendChannel[ID]
async disconnected(network: INetwork, conn: INetConn) None

Add peer_id to dead_peers_queue, so that pubsub and its router can remove this peer_id and close the stream inbetween.

Parameters:
  • network – network the connection was opened on

  • conn – connection that was opened

initiator_peers_queue: trio.MemorySendChannel[ID]
async listen(network: INetwork, multiaddr: Multiaddr) None
Parameters:
  • network – network the listener is listening on

  • multiaddr – multiaddress listener is listening on

async listen_close(network: INetwork, multiaddr: Multiaddr) None
Parameters:
  • network – network the connection was opened on

  • multiaddr – multiaddress listener is no longer listening on

async opened_stream(network: INetwork, stream: INetStream) None
Parameters:
  • network – network the stream was opened on

  • stream – stream that was opened

libp2p.pubsub.subscription module

class libp2p.pubsub.subscription.BaseSubscriptionAPI

Bases: ISubscriptionAPI

class libp2p.pubsub.subscription.TrioSubscriptionAPI(receive_channel: trio.MemoryReceiveChannel[rpc_pb2.Message], unsubscribe_fn: Callable[[], Awaitable[None]])

Bases: BaseSubscriptionAPI

async get() Message
receive_channel: trio.MemoryReceiveChannel[rpc_pb2.Message]
async unsubscribe() None
unsubscribe_fn: Callable[[], Awaitable[None]]

libp2p.pubsub.typing module

libp2p.pubsub.validators module

libp2p.pubsub.validators.signature_validator(msg: Message) bool

Verify the message against the given public key.

Parameters:
  • pubkey – the public key which signs the message.

  • msg – the message signed.

Module contents