libp2p.pubsub package¶
Submodules¶
libp2p.pubsub.abc module¶
-
class
libp2p.pubsub.abc.
IPubsub
¶ Bases:
async_service.abc.ServiceAPI
-
abstract property
my_id
¶
-
abstract property
protocols
¶
-
abstract async
publish
(topic_id: str, data: bytes) → None¶
-
abstract
remove_topic_validator
(topic: str) → None¶
-
abstract
set_topic_validator
(topic: str, validator: Union[Callable[[libp2p.peer.id.ID, libp2p.pubsub.pb.rpc_pb2.Message], bool], Callable[[libp2p.peer.id.ID, libp2p.pubsub.pb.rpc_pb2.Message], Awaitable[bool]]], is_async_validator: bool) → None¶
-
abstract async
subscribe
(topic_id: str) → libp2p.pubsub.abc.ISubscriptionAPI¶
-
abstract property
topic_ids
¶
-
abstract async
unsubscribe
(topic_id: str) → None¶
-
abstract async
wait_until_ready
() → None¶
-
abstract property
-
class
libp2p.pubsub.abc.
IPubsubRouter
¶ Bases:
abc.ABC
-
abstract
add_peer
(peer_id: libp2p.peer.id.ID, protocol_id: NewType.<locals>.new_type) → None¶ Notifies the router that a new peer has been connected.
- Parameters
peer_id – id of peer to add
-
abstract
attach
(pubsub: Pubsub) → None¶ Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance.
- Parameters
pubsub – pubsub instance to attach to
-
abstract
get_protocols
() → List[NewType.<locals>.new_type]¶ - Returns
the list of protocols supported by the router
-
abstract async
handle_rpc
(rpc: libp2p.pubsub.pb.rpc_pb2.RPC, sender_peer_id: libp2p.peer.id.ID) → None¶ Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed TODO: Check if this interface is ok. It’s not the exact same as the go code, but the go code is really confusing with the msg origin, they specify rpc.from even when the rpc shouldn’t have a from :param rpc: rpc message
-
abstract async
join
(topic: str) → None¶ Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the subscription announcement.
- Parameters
topic – topic to join
-
abstract async
leave
(topic: str) → None¶ Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement.
- Parameters
topic – topic to leave
-
abstract async
publish
(msg_forwarder: libp2p.peer.id.ID, pubsub_msg: libp2p.pubsub.pb.rpc_pb2.Message) → None¶ Invoked to forward a new message that has been validated.
- Parameters
msg_forwarder – peer_id of message sender
pubsub_msg – pubsub message to forward
-
abstract
remove_peer
(peer_id: libp2p.peer.id.ID) → None¶ Notifies the router that a peer has been disconnected.
- Parameters
peer_id – id of peer to remove
-
abstract
-
class
libp2p.pubsub.abc.
ISubscriptionAPI
¶ Bases:
contextlib.AbstractAsyncContextManager
,collections.abc.AsyncIterable
,typing.Generic
-
abstract async
get
() → libp2p.pubsub.pb.rpc_pb2.Message¶
-
abstract async
unsubscribe
() → None¶
-
abstract async
libp2p.pubsub.exceptions module¶
-
exception
libp2p.pubsub.exceptions.
NoPubsubAttached
¶
-
exception
libp2p.pubsub.exceptions.
PubsubRouterError
¶
libp2p.pubsub.floodsub module¶
-
class
libp2p.pubsub.floodsub.
FloodSub
(protocols: Sequence[NewType.<locals>.new_type])¶ Bases:
libp2p.pubsub.abc.IPubsubRouter
-
add_peer
(peer_id: libp2p.peer.id.ID, protocol_id: NewType.<locals>.new_type) → None¶ Notifies the router that a new peer has been connected.
- Parameters
peer_id – id of peer to add
-
attach
(pubsub: libp2p.pubsub.pubsub.Pubsub) → None¶ Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance.
- Parameters
pubsub – pubsub instance to attach to
-
get_protocols
() → List[NewType.<locals>.new_type]¶ - Returns
the list of protocols supported by the router
-
async
handle_rpc
(rpc: libp2p.pubsub.pb.rpc_pb2.RPC, sender_peer_id: libp2p.peer.id.ID) → None¶ Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed.
- Parameters
rpc – rpc message
-
async
join
(topic: str) → None¶ Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the subscription announcement.
- Parameters
topic – topic to join
-
async
leave
(topic: str) → None¶ Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement.
- Parameters
topic – topic to leave
-
protocols
: List[TProtocol] = None¶
-
async
publish
(msg_forwarder: libp2p.peer.id.ID, pubsub_msg: libp2p.pubsub.pb.rpc_pb2.Message) → None¶ Invoked to forward a new message that has been validated. This is where the “flooding” part of floodsub happens.
With flooding, routing is almost trivial: for each incoming message, forward to all known peers in the topic. There is a bit of logic, as the router maintains a timed cache of previous messages, so that seen messages are not further forwarded. It also never forwards a message back to the source or the peer that forwarded the message. :param msg_forwarder: peer ID of the peer who forwards the message to us :param pubsub_msg: pubsub message in protobuf.
-
pubsub
: Pubsub = None¶
-
remove_peer
(peer_id: libp2p.peer.id.ID) → None¶ Notifies the router that a peer has been disconnected.
- Parameters
peer_id – id of peer to remove
-
libp2p.pubsub.gossipsub module¶
-
class
libp2p.pubsub.gossipsub.
GossipSub
(protocols: Sequence[NewType.<locals>.new_type], degree: int, degree_low: int, degree_high: int, time_to_live: int, gossip_window: int = 3, gossip_history: int = 5, heartbeat_initial_delay: float = 0.1, heartbeat_interval: int = 120)¶ Bases:
libp2p.pubsub.abc.IPubsubRouter
,async_service.base.Service
-
add_peer
(peer_id: libp2p.peer.id.ID, protocol_id: NewType.<locals>.new_type) → None¶ Notifies the router that a new peer has been connected.
- Parameters
peer_id – id of peer to add
protocol_id – router protocol the peer speaks, e.g., floodsub, gossipsub
-
attach
(pubsub: libp2p.pubsub.pubsub.Pubsub) → None¶ Attach is invoked by the PubSub constructor to attach the router to a freshly initialized PubSub instance.
- Parameters
pubsub – pubsub instance to attach to
-
degree
: int = None¶
-
degree_high
: int = None¶
-
degree_low
: int = None¶
-
async
emit_control_message
(control_msg: libp2p.pubsub.pb.rpc_pb2.ControlMessage, to_peer: libp2p.peer.id.ID) → None¶
-
async
emit_graft
(topic: str, to_peer: libp2p.peer.id.ID) → None¶ Emit graft message, sent to to_peer, for topic.
-
async
emit_ihave
(topic: str, msg_ids: Any, to_peer: libp2p.peer.id.ID) → None¶ Emit ihave message, sent to to_peer, for topic and msg_ids.
-
async
emit_iwant
(msg_ids: Any, to_peer: libp2p.peer.id.ID) → None¶ Emit iwant message, sent to to_peer, for msg_ids.
-
async
emit_prune
(topic: str, to_peer: libp2p.peer.id.ID) → None¶ Emit graft message, sent to to_peer, for topic.
-
fanout
: Dict[str, Set[ID]] = None¶
-
fanout_heartbeat
() → None¶
-
get_protocols
() → List[NewType.<locals>.new_type]¶ - Returns
the list of protocols supported by the router
-
gossip_heartbeat
() → DefaultDict[libp2p.peer.id.ID, Dict[str, List[str]]]¶
-
async
handle_graft
(graft_msg: libp2p.pubsub.pb.rpc_pb2.ControlGraft, sender_peer_id: libp2p.peer.id.ID) → None¶
-
async
handle_ihave
(ihave_msg: libp2p.pubsub.pb.rpc_pb2.ControlIHave, sender_peer_id: libp2p.peer.id.ID) → None¶ Checks the seen set and requests unknown messages with an IWANT message.
-
async
handle_iwant
(iwant_msg: libp2p.pubsub.pb.rpc_pb2.ControlIWant, sender_peer_id: libp2p.peer.id.ID) → None¶ Forwards all request messages that are present in mcache to the requesting peer.
-
async
handle_prune
(prune_msg: libp2p.pubsub.pb.rpc_pb2.ControlPrune, sender_peer_id: libp2p.peer.id.ID) → None¶
-
async
handle_rpc
(rpc: libp2p.pubsub.pb.rpc_pb2.RPC, sender_peer_id: libp2p.peer.id.ID) → None¶ Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed.
- Parameters
rpc – RPC message
sender_peer_id – id of the peer who sent the message
-
async
heartbeat
() → None¶ Call individual heartbeats.
Note: the heartbeats are called with awaits because each heartbeat depends on the state changes in the preceding heartbeat
-
heartbeat_initial_delay
: float = None¶
-
heartbeat_interval
: int = None¶
-
async
join
(topic: str) → None¶ Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the subscription announcement.
- Parameters
topic – topic to join
-
async
leave
(topic: str) → None¶ Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement.
- Parameters
topic – topic to leave
-
mcache
: MessageCache = None¶
-
mesh
: Dict[str, Set[ID]] = None¶
-
mesh_heartbeat
() → Tuple[DefaultDict[libp2p.peer.id.ID, List[str]], DefaultDict[libp2p.peer.id.ID, List[str]]]¶
-
pack_control_msgs
(ihave_msgs: List[libp2p.pubsub.pb.rpc_pb2.ControlIHave], graft_msgs: List[libp2p.pubsub.pb.rpc_pb2.ControlGraft], prune_msgs: List[libp2p.pubsub.pb.rpc_pb2.ControlPrune]) → libp2p.pubsub.pb.rpc_pb2.ControlMessage¶
-
peer_protocol
: Dict[ID, TProtocol] = None¶
-
protocols
: List[TProtocol] = None¶
-
async
publish
(msg_forwarder: libp2p.peer.id.ID, pubsub_msg: libp2p.pubsub.pb.rpc_pb2.Message) → None¶ Invoked to forward a new message that has been validated.
-
pubsub
: Pubsub = None¶
-
remove_peer
(peer_id: libp2p.peer.id.ID) → None¶ Notifies the router that a peer has been disconnected.
- Parameters
peer_id – id of peer to remove
-
async
run
() → None¶ Primary entry point for all service logic.
Note
This method should not be directly invoked by user code.
Services may be run using the following approaches.
-
static
select_from_minus
(num_to_select: int, pool: Iterable[Any], minus: Iterable[Any]) → List[Any]¶ Select at most num_to_select subset of elements from the set (pool - minus) randomly. :param num_to_select: number of elements to randomly select :param pool: list of items to select from (excluding elements in minus) :param minus: elements to be excluded from selection pool :return: list of selected elements
-
time_to_live
: int = None¶
-
libp2p.pubsub.mcache module¶
-
class
libp2p.pubsub.mcache.
CacheEntry
(mid: Tuple[bytes, bytes], topics: Sequence[str])¶ Bases:
object
-
mid
: Tuple[bytes, bytes] = None¶
-
topics
: List[str] = None¶ A logical representation of an entry in the mcache’s _history_.
-
-
class
libp2p.pubsub.mcache.
MessageCache
(window_size: int, history_size: int)¶ Bases:
object
-
get
(mid: Tuple[bytes, bytes]) → Optional[libp2p.pubsub.pb.rpc_pb2.Message]¶ Get a message from the mcache.
- Parameters
mid – (seqno, from_id) of the message to get.
- Returns
The rpc message associated with this mid
-
history
: List[List[CacheEntry]] = None¶
-
history_size
: int = None¶
-
msgs
: Dict[Tuple[bytes, bytes], rpc_pb2.Message] = None¶
-
put
(msg: libp2p.pubsub.pb.rpc_pb2.Message) → None¶ Put a message into the mcache.
- Parameters
msg – The rpc message to put in. Should contain seqno and from_id
-
shift
() → None¶ Shift the window over by 1 position, dropping the last element of the history.
-
window
(topic: str) → List[Tuple[bytes, bytes]]¶ Get the window for this topic.
- Parameters
topic – Topic whose message ids we desire.
- Returns
List of mids in the current window.
-
window_size
: int = None¶
-
libp2p.pubsub.pubsub module¶
-
class
libp2p.pubsub.pubsub.
Pubsub
(host: libp2p.host.host_interface.IHost, router: IPubsubRouter, cache_size: int = None, strict_signing: bool = True, msg_id_constructor: Callable[[libp2p.pubsub.pb.rpc_pb2.Message], bytes] = <function get_peer_and_seqno_msg_id>)¶ Bases:
async_service.base.Service
,libp2p.pubsub.abc.IPubsub
-
async
continuously_read_stream
(stream: libp2p.network.stream.net_stream_interface.INetStream) → None¶ Read from input stream in an infinite loop. Process messages from other nodes.
- Parameters
stream – stream to continously read from
-
counter
: int = None¶
-
dead_peer_receive_channel
: 'trio.MemoryReceiveChannel[ID]' = None¶
-
event_handle_dead_peer_queue_started
: trio.Event = None¶
-
event_handle_peer_queue_started
: trio.Event = None¶
-
get_hello_packet
() → libp2p.pubsub.pb.rpc_pb2.RPC¶ Generate subscription message with all topics we are subscribed to only send hello packet if we have subscribed topics.
-
get_msg_validators
(msg: libp2p.pubsub.pb.rpc_pb2.Message) → Tuple[libp2p.pubsub.pubsub.TopicValidator, ...]¶ Get all validators corresponding to the topics in the message.
- Parameters
msg – the message published to the topic
-
async
handle_dead_peer_queue
() → None¶ Continuously read from dead peer channel and close the stream between that peer and remove peer info from pubsub and pubsub router.
-
async
handle_peer_queue
() → None¶ Continuously read from peer queue and each time a new peer is found, open a stream to the peer using a supported pubsub protocol pubsub protocols we support.
-
handle_subscription
(origin_id: libp2p.peer.id.ID, sub_message: libp2p.pubsub.pb.rpc_pb2.SubOpts) → None¶ Handle an incoming subscription message from a peer. Update internal mapping to mark the peer as subscribed or unsubscribed to topics as defined in the subscription message.
- Parameters
origin_id – id of the peer who subscribe to the message
sub_message – RPC.SubOpts
-
host
: IHost = None¶
-
async
message_all_peers
(raw_msg: bytes) → None¶ Broadcast a message to peers.
- Parameters
raw_msg – raw contents of the message to broadcast
-
property
my_id
¶
-
notify_subscriptions
(publish_message: libp2p.pubsub.pb.rpc_pb2.Message) → None¶ Put incoming message from a peer onto my blocking queue.
- Parameters
publish_message – RPC.Message format
-
peer_receive_channel
: 'trio.MemoryReceiveChannel[ID]' = None¶
-
peer_topics
: Dict[str, Set[ID]] = None¶
-
peers
: Dict[ID, INetStream] = None¶
-
property
protocols
¶
-
async
publish
(topic_id: str, data: bytes) → None¶ Publish data to a topic.
- Parameters
topic_id – topic which we are going to publish the data to
data – data which we are publishing
-
async
push_msg
(msg_forwarder: libp2p.peer.id.ID, msg: libp2p.pubsub.pb.rpc_pb2.Message) → None¶ Push a pubsub message to others.
- Parameters
msg_forwarder – the peer who forward us the message.
msg – the message we are going to push out.
-
remove_topic_validator
(topic: str) → None¶ Remove the validator from the given topic.
- Parameters
topic – the topic to remove validator from
-
router
: 'IPubsubRouter' = None¶
-
async
run
() → None¶ Primary entry point for all service logic.
Note
This method should not be directly invoked by user code.
Services may be run using the following approaches.
-
seen_messages
: LRU = None¶
-
set_topic_validator
(topic: str, validator: Union[Callable[[libp2p.peer.id.ID, libp2p.pubsub.pb.rpc_pb2.Message], bool], Callable[[libp2p.peer.id.ID, libp2p.pubsub.pb.rpc_pb2.Message], Awaitable[bool]]], is_async_validator: bool) → None¶ Register a validator under the given topic. One topic can only have one validtor.
- Parameters
topic – the topic to register validator under
validator – the validator used to validate messages published to the topic
is_async_validator – indicate if the validator is an asynchronous validator
-
sign_key
: PrivateKey = None¶
-
async
stream_handler
(stream: libp2p.network.stream.net_stream_interface.INetStream) → None¶ Stream handler for pubsub. Gets invoked whenever a new stream is created on one of the supported pubsub protocols.
- Parameters
stream – newly created stream
-
strict_signing
: bool = None¶
-
async
subscribe
(topic_id: str) → libp2p.pubsub.abc.ISubscriptionAPI¶ Subscribe ourself to a topic.
- Parameters
topic_id – topic_id to subscribe to
-
subscribed_topics_receive
: Dict[str, 'TrioSubscriptionAPI'] = None¶
-
subscribed_topics_send
: Dict[str, 'trio.MemorySendChannel[rpc_pb2.Message]'] = None¶
-
property
topic_ids
¶
-
topic_validators
: Dict[str, TopicValidator] = None¶
-
async
unsubscribe
(topic_id: str) → None¶ Unsubscribe ourself from a topic.
- Parameters
topic_id – topic_id to unsubscribe from
-
async
validate_msg
(msg_forwarder: libp2p.peer.id.ID, msg: libp2p.pubsub.pb.rpc_pb2.Message) → None¶ Validate the received message.
- Parameters
msg_forwarder – the peer who forward us the message.
msg – the message.
-
async
wait_until_ready
() → None¶
-
async
-
class
libp2p.pubsub.pubsub.
TopicValidator
(validator, is_async)¶ Bases:
tuple
-
property
is_async
¶ Alias for field number 1
-
property
validator
¶ Alias for field number 0
-
property
-
libp2p.pubsub.pubsub.
get_content_addressed_msg_id
(msg: libp2p.pubsub.pb.rpc_pb2.Message) → bytes¶
-
libp2p.pubsub.pubsub.
get_peer_and_seqno_msg_id
(msg: libp2p.pubsub.pb.rpc_pb2.Message) → bytes¶
libp2p.pubsub.pubsub_notifee module¶
-
class
libp2p.pubsub.pubsub_notifee.
PubsubNotifee
(initiator_peers_queue: trio.MemorySendChannel[ID], dead_peers_queue: trio.MemorySendChannel[ID])¶ Bases:
libp2p.network.notifee_interface.INotifee
-
async
closed_stream
(network: libp2p.network.network_interface.INetwork, stream: libp2p.network.stream.net_stream_interface.INetStream) → None¶ - Parameters
network – network the stream was closed on
stream – stream that was closed
-
async
connected
(network: libp2p.network.network_interface.INetwork, conn: libp2p.network.connection.net_connection_interface.INetConn) → None¶ Add peer_id to initiator_peers_queue, so that this peer_id can be used to create a stream and we only want to have one pubsub stream with each peer.
- Parameters
network – network the connection was opened on
conn – connection that was opened
-
dead_peers_queue
: 'trio.MemorySendChannel[ID]' = None¶
-
async
disconnected
(network: libp2p.network.network_interface.INetwork, conn: libp2p.network.connection.net_connection_interface.INetConn) → None¶ Add peer_id to dead_peers_queue, so that pubsub and its router can remove this peer_id and close the stream inbetween.
- Parameters
network – network the connection was opened on
conn – connection that was opened
-
initiator_peers_queue
: 'trio.MemorySendChannel[ID]' = None¶
-
async
listen
(network: libp2p.network.network_interface.INetwork, multiaddr: multiaddr.multiaddr.Multiaddr) → None¶ - Parameters
network – network the listener is listening on
multiaddr – multiaddress listener is listening on
-
async
listen_close
(network: libp2p.network.network_interface.INetwork, multiaddr: multiaddr.multiaddr.Multiaddr) → None¶ - Parameters
network – network the connection was opened on
multiaddr – multiaddress listener is no longer listening on
-
async
opened_stream
(network: libp2p.network.network_interface.INetwork, stream: libp2p.network.stream.net_stream_interface.INetStream) → None¶ - Parameters
network – network the stream was opened on
stream – stream that was opened
-
async
libp2p.pubsub.subscription module¶
-
class
libp2p.pubsub.subscription.
BaseSubscriptionAPI
¶
-
class
libp2p.pubsub.subscription.
TrioSubscriptionAPI
(receive_channel: trio.MemoryReceiveChannel[rpc_pb2.Message], unsubscribe_fn: Callable[[], Awaitable[None]])¶ Bases:
libp2p.pubsub.subscription.BaseSubscriptionAPI
-
async
get
() → libp2p.pubsub.pb.rpc_pb2.Message¶
-
receive_channel
: 'trio.MemoryReceiveChannel[rpc_pb2.Message]' = None¶
-
async
unsubscribe
() → None¶
-
unsubscribe_fn
: UnsubscribeFn = None¶
-
async
libp2p.pubsub.validators module¶
-
libp2p.pubsub.validators.
signature_validator
(msg: libp2p.pubsub.pb.rpc_pb2.Message) → bool¶ Verify the message against the given public key.
- Parameters
pubkey – the public key which signs the message.
msg – the message signed.