libp2p.pubsub package

Submodules

libp2p.pubsub.abc module

class libp2p.pubsub.abc.IPubsub

Bases: async_service.abc.ServiceAPI

abstract property my_id
abstract property protocols
abstract async publish(topic_id: str, data: bytes) → None
abstract remove_topic_validator(topic: str) → None
abstract set_topic_validator(topic: str, validator: Union[Callable[[libp2p.peer.id.ID, libp2p.pubsub.pb.rpc_pb2.Message], bool], Callable[[libp2p.peer.id.ID, libp2p.pubsub.pb.rpc_pb2.Message], Awaitable[bool]]], is_async_validator: bool) → None
abstract async subscribe(topic_id: str) → libp2p.pubsub.abc.ISubscriptionAPI
abstract property topic_ids
abstract async unsubscribe(topic_id: str) → None
abstract async wait_until_ready() → None
class libp2p.pubsub.abc.IPubsubRouter

Bases: abc.ABC

abstract add_peer(peer_id: libp2p.peer.id.ID, protocol_id: NewType.<locals>.new_type) → None

Notifies the router that a new peer has been connected.

Parameters

peer_id – id of peer to add

abstract attach(pubsub: Pubsub) → None

Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance.

Parameters

pubsub – pubsub instance to attach to

abstract get_protocols() → List[NewType.<locals>.new_type]
Returns

the list of protocols supported by the router

abstract async handle_rpc(rpc: libp2p.pubsub.pb.rpc_pb2.RPC, sender_peer_id: libp2p.peer.id.ID) → None

Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed TODO: Check if this interface is ok. It’s not the exact same as the go code, but the go code is really confusing with the msg origin, they specify rpc.from even when the rpc shouldn’t have a from :param rpc: rpc message

abstract async join(topic: str) → None

Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the subscription announcement.

Parameters

topic – topic to join

abstract async leave(topic: str) → None

Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement.

Parameters

topic – topic to leave

abstract async publish(msg_forwarder: libp2p.peer.id.ID, pubsub_msg: libp2p.pubsub.pb.rpc_pb2.Message) → None

Invoked to forward a new message that has been validated.

Parameters
  • msg_forwarder – peer_id of message sender

  • pubsub_msg – pubsub message to forward

abstract remove_peer(peer_id: libp2p.peer.id.ID) → None

Notifies the router that a peer has been disconnected.

Parameters

peer_id – id of peer to remove

class libp2p.pubsub.abc.ISubscriptionAPI

Bases: contextlib.AbstractAsyncContextManager, collections.abc.AsyncIterable, typing.Generic

abstract async get() → libp2p.pubsub.pb.rpc_pb2.Message
abstract async unsubscribe() → None

libp2p.pubsub.exceptions module

exception libp2p.pubsub.exceptions.NoPubsubAttached

Bases: libp2p.pubsub.exceptions.PubsubRouterError

exception libp2p.pubsub.exceptions.PubsubRouterError

Bases: libp2p.exceptions.BaseLibp2pError

libp2p.pubsub.floodsub module

class libp2p.pubsub.floodsub.FloodSub(protocols: Sequence[NewType.<locals>.new_type])

Bases: libp2p.pubsub.abc.IPubsubRouter

add_peer(peer_id: libp2p.peer.id.ID, protocol_id: NewType.<locals>.new_type) → None

Notifies the router that a new peer has been connected.

Parameters

peer_id – id of peer to add

attach(pubsub: libp2p.pubsub.pubsub.Pubsub) → None

Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance.

Parameters

pubsub – pubsub instance to attach to

get_protocols() → List[NewType.<locals>.new_type]
Returns

the list of protocols supported by the router

async handle_rpc(rpc: libp2p.pubsub.pb.rpc_pb2.RPC, sender_peer_id: libp2p.peer.id.ID) → None

Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed.

Parameters

rpc – rpc message

async join(topic: str) → None

Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the subscription announcement.

Parameters

topic – topic to join

async leave(topic: str) → None

Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement.

Parameters

topic – topic to leave

protocols: List[TProtocol] = None
async publish(msg_forwarder: libp2p.peer.id.ID, pubsub_msg: libp2p.pubsub.pb.rpc_pb2.Message) → None

Invoked to forward a new message that has been validated. This is where the “flooding” part of floodsub happens.

With flooding, routing is almost trivial: for each incoming message, forward to all known peers in the topic. There is a bit of logic, as the router maintains a timed cache of previous messages, so that seen messages are not further forwarded. It also never forwards a message back to the source or the peer that forwarded the message. :param msg_forwarder: peer ID of the peer who forwards the message to us :param pubsub_msg: pubsub message in protobuf.

pubsub: Pubsub = None
remove_peer(peer_id: libp2p.peer.id.ID) → None

Notifies the router that a peer has been disconnected.

Parameters

peer_id – id of peer to remove

libp2p.pubsub.gossipsub module

class libp2p.pubsub.gossipsub.GossipSub(protocols: Sequence[NewType.<locals>.new_type], degree: int, degree_low: int, degree_high: int, time_to_live: int, gossip_window: int = 3, gossip_history: int = 5, heartbeat_initial_delay: float = 0.1, heartbeat_interval: int = 120)

Bases: libp2p.pubsub.abc.IPubsubRouter, async_service.base.Service

add_peer(peer_id: libp2p.peer.id.ID, protocol_id: NewType.<locals>.new_type) → None

Notifies the router that a new peer has been connected.

Parameters
  • peer_id – id of peer to add

  • protocol_id – router protocol the peer speaks, e.g., floodsub, gossipsub

attach(pubsub: libp2p.pubsub.pubsub.Pubsub) → None

Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance.

Parameters

pubsub – pubsub instance to attach to

degree: int = None
degree_high: int = None
degree_low: int = None
async emit_control_message(control_msg: libp2p.pubsub.pb.rpc_pb2.ControlMessage, to_peer: libp2p.peer.id.ID) → None
async emit_graft(topic: str, to_peer: libp2p.peer.id.ID) → None

Emit graft message, sent to to_peer, for topic.

async emit_ihave(topic: str, msg_ids: Any, to_peer: libp2p.peer.id.ID) → None

Emit ihave message, sent to to_peer, for topic and msg_ids.

async emit_iwant(msg_ids: Any, to_peer: libp2p.peer.id.ID) → None

Emit iwant message, sent to to_peer, for msg_ids.

async emit_prune(topic: str, to_peer: libp2p.peer.id.ID) → None

Emit graft message, sent to to_peer, for topic.

fanout: Dict[str, Set[ID]] = None
fanout_heartbeat() → None
get_protocols() → List[NewType.<locals>.new_type]
Returns

the list of protocols supported by the router

gossip_heartbeat() → DefaultDict[libp2p.peer.id.ID, Dict[str, List[str]]]
async handle_graft(graft_msg: libp2p.pubsub.pb.rpc_pb2.ControlGraft, sender_peer_id: libp2p.peer.id.ID) → None
async handle_ihave(ihave_msg: libp2p.pubsub.pb.rpc_pb2.ControlIHave, sender_peer_id: libp2p.peer.id.ID) → None

Checks the seen set and requests unknown messages with an IWANT message.

async handle_iwant(iwant_msg: libp2p.pubsub.pb.rpc_pb2.ControlIWant, sender_peer_id: libp2p.peer.id.ID) → None

Forwards all request messages that are present in mcache to the requesting peer.

async handle_prune(prune_msg: libp2p.pubsub.pb.rpc_pb2.ControlPrune, sender_peer_id: libp2p.peer.id.ID) → None
async handle_rpc(rpc: libp2p.pubsub.pb.rpc_pb2.RPC, sender_peer_id: libp2p.peer.id.ID) → None

Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed.

Parameters
  • rpc – RPC message

  • sender_peer_id – id of the peer who sent the message

async heartbeat() → None

Call individual heartbeats.

Note: the heartbeats are called with awaits because each heartbeat depends on the state changes in the preceding heartbeat

heartbeat_initial_delay: float = None
heartbeat_interval: int = None
async join(topic: str) → None

Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the subscription announcement.

Parameters

topic – topic to join

async leave(topic: str) → None

Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement.

Parameters

topic – topic to leave

mcache: MessageCache = None
mesh: Dict[str, Set[ID]] = None
mesh_heartbeat() → Tuple[DefaultDict[libp2p.peer.id.ID, List[str]], DefaultDict[libp2p.peer.id.ID, List[str]]]
pack_control_msgs(ihave_msgs: List[libp2p.pubsub.pb.rpc_pb2.ControlIHave], graft_msgs: List[libp2p.pubsub.pb.rpc_pb2.ControlGraft], prune_msgs: List[libp2p.pubsub.pb.rpc_pb2.ControlPrune]) → libp2p.pubsub.pb.rpc_pb2.ControlMessage
peer_protocol: Dict[ID, TProtocol] = None
protocols: List[TProtocol] = None
async publish(msg_forwarder: libp2p.peer.id.ID, pubsub_msg: libp2p.pubsub.pb.rpc_pb2.Message) → None

Invoked to forward a new message that has been validated.

pubsub: Pubsub = None
remove_peer(peer_id: libp2p.peer.id.ID) → None

Notifies the router that a peer has been disconnected.

Parameters

peer_id – id of peer to remove

async run() → None

Primary entry point for all service logic.

Note

This method should not be directly invoked by user code.

Services may be run using the following approaches.

static select_from_minus(num_to_select: int, pool: Iterable[Any], minus: Iterable[Any]) → List[Any]

Select at most num_to_select subset of elements from the set (pool - minus) randomly. :param num_to_select: number of elements to randomly select :param pool: list of items to select from (excluding elements in minus) :param minus: elements to be excluded from selection pool :return: list of selected elements

time_to_live: int = None

libp2p.pubsub.mcache module

class libp2p.pubsub.mcache.CacheEntry(mid: Tuple[bytes, bytes], topics: Sequence[str])

Bases: object

mid: Tuple[bytes, bytes] = None
topics: List[str] = None

A logical representation of an entry in the mcache’s _history_.

class libp2p.pubsub.mcache.MessageCache(window_size: int, history_size: int)

Bases: object

get(mid: Tuple[bytes, bytes]) → Optional[libp2p.pubsub.pb.rpc_pb2.Message]

Get a message from the mcache.

Parameters

mid – (seqno, from_id) of the message to get.

Returns

The rpc message associated with this mid

history: List[List[CacheEntry]] = None
history_size: int = None
msgs: Dict[Tuple[bytes, bytes], rpc_pb2.Message] = None
put(msg: libp2p.pubsub.pb.rpc_pb2.Message) → None

Put a message into the mcache.

Parameters

msg – The rpc message to put in. Should contain seqno and from_id

shift() → None

Shift the window over by 1 position, dropping the last element of the history.

window(topic: str) → List[Tuple[bytes, bytes]]

Get the window for this topic.

Parameters

topic – Topic whose message ids we desire.

Returns

List of mids in the current window.

window_size: int = None

libp2p.pubsub.pubsub module

class libp2p.pubsub.pubsub.Pubsub(host: libp2p.host.host_interface.IHost, router: IPubsubRouter, cache_size: int = None, strict_signing: bool = True, msg_id_constructor: Callable[[libp2p.pubsub.pb.rpc_pb2.Message], bytes] = <function get_peer_and_seqno_msg_id>)

Bases: async_service.base.Service, libp2p.pubsub.abc.IPubsub

async continuously_read_stream(stream: libp2p.network.stream.net_stream_interface.INetStream) → None

Read from input stream in an infinite loop. Process messages from other nodes.

Parameters

stream – stream to continously read from

counter: int = None
dead_peer_receive_channel: 'trio.MemoryReceiveChannel[ID]' = None
event_handle_dead_peer_queue_started: trio.Event = None
event_handle_peer_queue_started: trio.Event = None
get_hello_packet() → libp2p.pubsub.pb.rpc_pb2.RPC

Generate subscription message with all topics we are subscribed to only send hello packet if we have subscribed topics.

get_msg_validators(msg: libp2p.pubsub.pb.rpc_pb2.Message) → Tuple[libp2p.pubsub.pubsub.TopicValidator, ...]

Get all validators corresponding to the topics in the message.

Parameters

msg – the message published to the topic

async handle_dead_peer_queue() → None

Continuously read from dead peer channel and close the stream between that peer and remove peer info from pubsub and pubsub router.

async handle_peer_queue() → None

Continuously read from peer queue and each time a new peer is found, open a stream to the peer using a supported pubsub protocol pubsub protocols we support.

handle_subscription(origin_id: libp2p.peer.id.ID, sub_message: libp2p.pubsub.pb.rpc_pb2.SubOpts) → None

Handle an incoming subscription message from a peer. Update internal mapping to mark the peer as subscribed or unsubscribed to topics as defined in the subscription message.

Parameters
  • origin_id – id of the peer who subscribe to the message

  • sub_message – RPC.SubOpts

host: IHost = None
async message_all_peers(raw_msg: bytes) → None

Broadcast a message to peers.

Parameters

raw_msg – raw contents of the message to broadcast

property my_id
notify_subscriptions(publish_message: libp2p.pubsub.pb.rpc_pb2.Message) → None

Put incoming message from a peer onto my blocking queue.

Parameters

publish_message – RPC.Message format

peer_receive_channel: 'trio.MemoryReceiveChannel[ID]' = None
peer_topics: Dict[str, Set[ID]] = None
peers: Dict[ID, INetStream] = None
property protocols
async publish(topic_id: str, data: bytes) → None

Publish data to a topic.

Parameters
  • topic_id – topic which we are going to publish the data to

  • data – data which we are publishing

async push_msg(msg_forwarder: libp2p.peer.id.ID, msg: libp2p.pubsub.pb.rpc_pb2.Message) → None

Push a pubsub message to others.

Parameters
  • msg_forwarder – the peer who forward us the message.

  • msg – the message we are going to push out.

remove_topic_validator(topic: str) → None

Remove the validator from the given topic.

Parameters

topic – the topic to remove validator from

router: 'IPubsubRouter' = None
async run() → None

Primary entry point for all service logic.

Note

This method should not be directly invoked by user code.

Services may be run using the following approaches.

seen_messages: LRU = None
set_topic_validator(topic: str, validator: Union[Callable[[libp2p.peer.id.ID, libp2p.pubsub.pb.rpc_pb2.Message], bool], Callable[[libp2p.peer.id.ID, libp2p.pubsub.pb.rpc_pb2.Message], Awaitable[bool]]], is_async_validator: bool) → None

Register a validator under the given topic. One topic can only have one validtor.

Parameters
  • topic – the topic to register validator under

  • validator – the validator used to validate messages published to the topic

  • is_async_validator – indicate if the validator is an asynchronous validator

sign_key: PrivateKey = None
async stream_handler(stream: libp2p.network.stream.net_stream_interface.INetStream) → None

Stream handler for pubsub. Gets invoked whenever a new stream is created on one of the supported pubsub protocols.

Parameters

stream – newly created stream

strict_signing: bool = None
async subscribe(topic_id: str) → libp2p.pubsub.abc.ISubscriptionAPI

Subscribe ourself to a topic.

Parameters

topic_id – topic_id to subscribe to

subscribed_topics_receive: Dict[str, 'TrioSubscriptionAPI'] = None
subscribed_topics_send: Dict[str, 'trio.MemorySendChannel[rpc_pb2.Message]'] = None
property topic_ids
topic_validators: Dict[str, TopicValidator] = None
async unsubscribe(topic_id: str) → None

Unsubscribe ourself from a topic.

Parameters

topic_id – topic_id to unsubscribe from

async validate_msg(msg_forwarder: libp2p.peer.id.ID, msg: libp2p.pubsub.pb.rpc_pb2.Message) → None

Validate the received message.

Parameters
  • msg_forwarder – the peer who forward us the message.

  • msg – the message.

async wait_until_ready() → None
class libp2p.pubsub.pubsub.TopicValidator(validator, is_async)

Bases: tuple

property is_async

Alias for field number 1

property validator

Alias for field number 0

libp2p.pubsub.pubsub.get_content_addressed_msg_id(msg: libp2p.pubsub.pb.rpc_pb2.Message) → bytes
libp2p.pubsub.pubsub.get_peer_and_seqno_msg_id(msg: libp2p.pubsub.pb.rpc_pb2.Message) → bytes

libp2p.pubsub.pubsub_notifee module

class libp2p.pubsub.pubsub_notifee.PubsubNotifee(initiator_peers_queue: trio.MemorySendChannel[ID], dead_peers_queue: trio.MemorySendChannel[ID])

Bases: libp2p.network.notifee_interface.INotifee

async closed_stream(network: libp2p.network.network_interface.INetwork, stream: libp2p.network.stream.net_stream_interface.INetStream) → None
Parameters
  • network – network the stream was closed on

  • stream – stream that was closed

async connected(network: libp2p.network.network_interface.INetwork, conn: libp2p.network.connection.net_connection_interface.INetConn) → None

Add peer_id to initiator_peers_queue, so that this peer_id can be used to create a stream and we only want to have one pubsub stream with each peer.

Parameters
  • network – network the connection was opened on

  • conn – connection that was opened

dead_peers_queue: 'trio.MemorySendChannel[ID]' = None
async disconnected(network: libp2p.network.network_interface.INetwork, conn: libp2p.network.connection.net_connection_interface.INetConn) → None

Add peer_id to dead_peers_queue, so that pubsub and its router can remove this peer_id and close the stream inbetween.

Parameters
  • network – network the connection was opened on

  • conn – connection that was opened

initiator_peers_queue: 'trio.MemorySendChannel[ID]' = None
async listen(network: libp2p.network.network_interface.INetwork, multiaddr: multiaddr.multiaddr.Multiaddr) → None
Parameters
  • network – network the listener is listening on

  • multiaddr – multiaddress listener is listening on

async listen_close(network: libp2p.network.network_interface.INetwork, multiaddr: multiaddr.multiaddr.Multiaddr) → None
Parameters
  • network – network the connection was opened on

  • multiaddr – multiaddress listener is no longer listening on

async opened_stream(network: libp2p.network.network_interface.INetwork, stream: libp2p.network.stream.net_stream_interface.INetStream) → None
Parameters
  • network – network the stream was opened on

  • stream – stream that was opened

libp2p.pubsub.subscription module

class libp2p.pubsub.subscription.BaseSubscriptionAPI

Bases: libp2p.pubsub.abc.ISubscriptionAPI

class libp2p.pubsub.subscription.TrioSubscriptionAPI(receive_channel: trio.MemoryReceiveChannel[rpc_pb2.Message], unsubscribe_fn: Callable[[], Awaitable[None]])

Bases: libp2p.pubsub.subscription.BaseSubscriptionAPI

async get() → libp2p.pubsub.pb.rpc_pb2.Message
receive_channel: 'trio.MemoryReceiveChannel[rpc_pb2.Message]' = None
async unsubscribe() → None
unsubscribe_fn: UnsubscribeFn = None

libp2p.pubsub.validators module

libp2p.pubsub.validators.signature_validator(msg: libp2p.pubsub.pb.rpc_pb2.Message) → bool

Verify the message against the given public key.

Parameters
  • pubkey – the public key which signs the message.

  • msg – the message signed.

Module contents