libp2p.pubsub package
Subpackages
Submodules
libp2p.pubsub.abc module
- class libp2p.pubsub.abc.IPubsub
Bases:
ServiceAPI
- abstract set_topic_validator(topic: str, validator: Callable[[ID, Message], bool] | Callable[[ID, Message], Awaitable[bool]], is_async_validator: bool) None
- abstract async subscribe(topic_id: str) ISubscriptionAPI
- class libp2p.pubsub.abc.IPubsubRouter
Bases:
ABC
- abstract add_peer(peer_id: ID, protocol_id: TProtocol) None
Notifies the router that a new peer has been connected.
- Parameters:
peer_id – id of peer to add
- abstract attach(pubsub: Pubsub) None
Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance.
- Parameters:
pubsub – pubsub instance to attach to
- abstract async handle_rpc(rpc: RPC, sender_peer_id: ID) None
Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed TODO: Check if this interface is ok. It’s not the exact same as the go code, but the go code is really confusing with the msg origin, they specify rpc.from even when the rpc shouldn’t have a from :param rpc: rpc message
- abstract async join(topic: str) None
Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the subscription announcement.
- Parameters:
topic – topic to join
- abstract async leave(topic: str) None
Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement.
- Parameters:
topic – topic to leave
- class libp2p.pubsub.abc.ISubscriptionAPI
Bases:
AsyncContextManager
[ISubscriptionAPI
],AsyncIterable
[Message
]
libp2p.pubsub.exceptions module
- exception libp2p.pubsub.exceptions.NoPubsubAttached
Bases:
PubsubRouterError
- exception libp2p.pubsub.exceptions.PubsubRouterError
Bases:
BaseLibp2pError
libp2p.pubsub.floodsub module
- class libp2p.pubsub.floodsub.FloodSub(protocols: Sequence[TProtocol])
Bases:
IPubsubRouter
- add_peer(peer_id: ID, protocol_id: TProtocol) None
Notifies the router that a new peer has been connected.
- Parameters:
peer_id – id of peer to add
- attach(pubsub: Pubsub) None
Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance.
- Parameters:
pubsub – pubsub instance to attach to
- async handle_rpc(rpc: RPC, sender_peer_id: ID) None
Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed.
- Parameters:
rpc – rpc message
- async join(topic: str) None
Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the subscription announcement.
- Parameters:
topic – topic to join
- async leave(topic: str) None
Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement.
- Parameters:
topic – topic to leave
- async publish(msg_forwarder: ID, pubsub_msg: Message) None
Invoked to forward a new message that has been validated. This is where the “flooding” part of floodsub happens.
With flooding, routing is almost trivial: for each incoming message, forward to all known peers in the topic. There is a bit of logic, as the router maintains a timed cache of previous messages, so that seen messages are not further forwarded. It also never forwards a message back to the source or the peer that forwarded the message. :param msg_forwarder: peer ID of the peer who forwards the message to us :param pubsub_msg: pubsub message in protobuf.
libp2p.pubsub.gossipsub module
- class libp2p.pubsub.gossipsub.GossipSub(protocols: Sequence[TProtocol], degree: int, degree_low: int, degree_high: int, time_to_live: int, gossip_window: int = 3, gossip_history: int = 5, heartbeat_initial_delay: float = 0.1, heartbeat_interval: int = 120)
Bases:
IPubsubRouter
,Service
- add_peer(peer_id: ID, protocol_id: TProtocol) None
Notifies the router that a new peer has been connected.
- Parameters:
peer_id – id of peer to add
protocol_id – router protocol the peer speaks, e.g., floodsub, gossipsub
- attach(pubsub: Pubsub) None
Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance.
- Parameters:
pubsub – pubsub instance to attach to
- async emit_control_message(control_msg: ControlMessage, to_peer: ID) None
- async emit_ihave(topic: str, msg_ids: Any, to_peer: ID) None
Emit ihave message, sent to to_peer, for topic and msg_ids.
- async emit_iwant(msg_ids: Any, to_peer: ID) None
Emit iwant message, sent to to_peer, for msg_ids.
- async handle_graft(graft_msg: ControlGraft, sender_peer_id: ID) None
- async handle_ihave(ihave_msg: ControlIHave, sender_peer_id: ID) None
Checks the seen set and requests unknown messages with an IWANT message.
- async handle_iwant(iwant_msg: ControlIWant, sender_peer_id: ID) None
Forwards all request messages that are present in mcache to the requesting peer.
- async handle_prune(prune_msg: ControlPrune, sender_peer_id: ID) None
- async handle_rpc(rpc: RPC, sender_peer_id: ID) None
Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed.
- Parameters:
rpc – RPC message
sender_peer_id – id of the peer who sent the message
- async heartbeat() None
Call individual heartbeats.
Note: the heartbeats are called with awaits because each heartbeat depends on the state changes in the preceding heartbeat
- async join(topic: str) None
Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the subscription announcement.
- Parameters:
topic – topic to join
- async leave(topic: str) None
Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement.
- Parameters:
topic – topic to leave
- mcache: MessageCache
- mesh_heartbeat() Tuple[DefaultDict[ID, List[str]], DefaultDict[ID, List[str]]]
- pack_control_msgs(ihave_msgs: List[ControlIHave], graft_msgs: List[ControlGraft], prune_msgs: List[ControlPrune]) ControlMessage
- async publish(msg_forwarder: ID, pubsub_msg: Message) None
Invoked to forward a new message that has been validated.
- remove_peer(peer_id: ID) None
Notifies the router that a peer has been disconnected.
- Parameters:
peer_id – id of peer to remove
- async run() None
Primary entry point for all service logic.
Note
This method should not be directly invoked by user code.
Services may be run using the following approaches.
- static select_from_minus(num_to_select: int, pool: Iterable[Any], minus: Iterable[Any]) List[Any]
Select at most num_to_select subset of elements from the set (pool - minus) randomly. :param num_to_select: number of elements to randomly select :param pool: list of items to select from (excluding elements in minus) :param minus: elements to be excluded from selection pool :return: list of selected elements
libp2p.pubsub.mcache module
- class libp2p.pubsub.mcache.CacheEntry(mid: Tuple[bytes, bytes], topics: Sequence[str])
Bases:
object
- class libp2p.pubsub.mcache.MessageCache(window_size: int, history_size: int)
Bases:
object
- get(mid: Tuple[bytes, bytes]) Message | None
Get a message from the mcache.
- Parameters:
mid – (seqno, from_id) of the message to get.
- Returns:
The rpc message associated with this mid
- history: List[List[CacheEntry]]
- put(msg: Message) None
Put a message into the mcache.
- Parameters:
msg – The rpc message to put in. Should contain seqno and from_id
libp2p.pubsub.pubsub module
- class libp2p.pubsub.pubsub.Pubsub(host: ~libp2p.host.host_interface.IHost, router: IPubsubRouter, cache_size: int = None, strict_signing: bool = True, msg_id_constructor: ~typing.Callable[[~libp2p.pubsub.pb.rpc_pb2.Message], bytes] = <function get_peer_and_seqno_msg_id>)
Bases:
Service
,IPubsub
- async continuously_read_stream(stream: INetStream) None
Read from input stream in an infinite loop. Process messages from other nodes.
- Parameters:
stream – stream to continously read from
- event_handle_dead_peer_queue_started: Event
- event_handle_peer_queue_started: Event
- get_hello_packet() RPC
Generate subscription message with all topics we are subscribed to only send hello packet if we have subscribed topics.
- get_msg_validators(msg: Message) Tuple[TopicValidator, ...]
Get all validators corresponding to the topics in the message.
- Parameters:
msg – the message published to the topic
- async handle_dead_peer_queue() None
Continuously read from dead peer channel and close the stream between that peer and remove peer info from pubsub and pubsub router.
- async handle_peer_queue() None
Continuously read from peer queue and each time a new peer is found, open a stream to the peer using a supported pubsub protocol pubsub protocols we support.
- handle_subscription(origin_id: ID, sub_message: SubOpts) None
Handle an incoming subscription message from a peer. Update internal mapping to mark the peer as subscribed or unsubscribed to topics as defined in the subscription message.
- Parameters:
origin_id – id of the peer who subscribe to the message
sub_message – RPC.SubOpts
- async message_all_peers(raw_msg: bytes) None
Broadcast a message to peers.
- Parameters:
raw_msg – raw contents of the message to broadcast
- notify_subscriptions(publish_message: Message) None
Put incoming message from a peer onto my blocking queue.
- Parameters:
publish_message – RPC.Message format
- peers: Dict[ID, INetStream]
- async publish(topic_id: str, data: bytes) None
Publish data to a topic.
- Parameters:
topic_id – topic which we are going to publish the data to
data – data which we are publishing
- async push_msg(msg_forwarder: ID, msg: Message) None
Push a pubsub message to others.
- Parameters:
msg_forwarder – the peer who forward us the message.
msg – the message we are going to push out.
- remove_topic_validator(topic: str) None
Remove the validator from the given topic.
- Parameters:
topic – the topic to remove validator from
- router: IPubsubRouter
- async run() None
Primary entry point for all service logic.
Note
This method should not be directly invoked by user code.
Services may be run using the following approaches.
- seen_messages: LRU
- set_topic_validator(topic: str, validator: Callable[[ID, Message], bool] | Callable[[ID, Message], Awaitable[bool]], is_async_validator: bool) None
Register a validator under the given topic. One topic can only have one validtor.
- Parameters:
topic – the topic to register validator under
validator – the validator used to validate messages published to the topic
is_async_validator – indicate if the validator is an asynchronous validator
- sign_key: PrivateKey
- async stream_handler(stream: INetStream) None
Stream handler for pubsub. Gets invoked whenever a new stream is created on one of the supported pubsub protocols.
- Parameters:
stream – newly created stream
- async subscribe(topic_id: str) ISubscriptionAPI
Subscribe ourself to a topic.
- Parameters:
topic_id – topic_id to subscribe to
- subscribed_topics_receive: Dict[str, TrioSubscriptionAPI]
- subscribed_topics_send: Dict[str, trio.MemorySendChannel[rpc_pb2.Message]]
- topic_validators: Dict[str, TopicValidator]
- async unsubscribe(topic_id: str) None
Unsubscribe ourself from a topic.
- Parameters:
topic_id – topic_id to unsubscribe from
- class libp2p.pubsub.pubsub.TopicValidator(validator, is_async)
Bases:
NamedTuple
libp2p.pubsub.pubsub_notifee module
- class libp2p.pubsub.pubsub_notifee.PubsubNotifee(initiator_peers_queue: trio.MemorySendChannel[ID], dead_peers_queue: trio.MemorySendChannel[ID])
Bases:
INotifee
- async closed_stream(network: INetwork, stream: INetStream) None
- Parameters:
network – network the stream was closed on
stream – stream that was closed
- async connected(network: INetwork, conn: INetConn) None
Add peer_id to initiator_peers_queue, so that this peer_id can be used to create a stream and we only want to have one pubsub stream with each peer.
- Parameters:
network – network the connection was opened on
conn – connection that was opened
- async disconnected(network: INetwork, conn: INetConn) None
Add peer_id to dead_peers_queue, so that pubsub and its router can remove this peer_id and close the stream inbetween.
- Parameters:
network – network the connection was opened on
conn – connection that was opened
- async listen(network: INetwork, multiaddr: Multiaddr) None
- Parameters:
network – network the listener is listening on
multiaddr – multiaddress listener is listening on
- async listen_close(network: INetwork, multiaddr: Multiaddr) None
- Parameters:
network – network the connection was opened on
multiaddr – multiaddress listener is no longer listening on
- async opened_stream(network: INetwork, stream: INetStream) None
- Parameters:
network – network the stream was opened on
stream – stream that was opened
libp2p.pubsub.subscription module
- class libp2p.pubsub.subscription.BaseSubscriptionAPI
Bases:
ISubscriptionAPI
- class libp2p.pubsub.subscription.TrioSubscriptionAPI(receive_channel: trio.MemoryReceiveChannel[rpc_pb2.Message], unsubscribe_fn: Callable[[], Awaitable[None]])
Bases:
BaseSubscriptionAPI
- receive_channel: trio.MemoryReceiveChannel[rpc_pb2.Message]