libp2p.tools package

Subpackages

Submodules

libp2p.tools.constants module

class libp2p.tools.constants.GossipsubParams(degree, degree_low, degree_high, time_to_live, gossip_window, gossip_history, heartbeat_initial_delay, heartbeat_interval)

Bases: NamedTuple

degree: int

Alias for field number 0

degree_high: int

Alias for field number 2

degree_low: int

Alias for field number 1

gossip_history: int

Alias for field number 5

gossip_window: int

Alias for field number 4

heartbeat_initial_delay: float

Alias for field number 6

heartbeat_interval: float

Alias for field number 7

time_to_live: int

Alias for field number 3

libp2p.tools.factories module

class libp2p.tools.factories.DummyRouter

Bases: IPeerRouting

async find_peer(peer_id: ID) PeerInfo

Find specific Peer FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo with relevant addresses.

class libp2p.tools.factories.FloodsubFactory(**kwargs)

Bases: Factory

protocols = ('/floodsub/1.0.0',)
class libp2p.tools.factories.GossipsubFactory(**kwargs)

Bases: Factory

degree = 10
degree_high = 11
degree_low = 9
gossip_history = 5
gossip_window = 3
heartbeat_initial_delay = 0.1
heartbeat_interval = 0.5
protocols = ('/meshsub/1.0.0',)
time_to_live = 30
class libp2p.tools.factories.HostFactory(**kwargs)

Bases: Factory

classmethod create_batch_and_listen(number: int, security_protocol: TProtocol = None, muxer_opt: Mapping[TProtocol, Type[IMuxedConn]] = None) AsyncIterator[Tuple[BasicHost, ...]]
network = <factory.declarations.LazyAttribute object>
class libp2p.tools.factories.IDFactory(**kwargs)

Bases: Factory

peer_id_bytes = <factory.declarations.LazyFunction object>
class libp2p.tools.factories.PubsubFactory(**kwargs)

Bases: Factory

cache_size = None
classmethod create_and_start(host: IHost, router: IPubsubRouter, cache_size: int, strict_signing: bool, msg_id_constructor: Callable[[Message], bytes] = None) AsyncIterator[Pubsub]
classmethod create_batch_with_floodsub(number: int, cache_size: int = None, strict_signing: bool = False, protocols: ~typing.Sequence[TProtocol] = None, security_protocol: TProtocol = None, muxer_opt: ~typing.Mapping[TProtocol, ~typing.Type[~libp2p.stream_muxer.abc.IMuxedConn]] = None, msg_id_constructor: ~typing.Callable[[~libp2p.pubsub.pb.rpc_pb2.Message], bytes] = <function get_peer_and_seqno_msg_id>) AsyncIterator[Tuple[Pubsub, ...]]
classmethod create_batch_with_gossipsub(number: int, *, cache_size: int = None, strict_signing: bool = False, protocols: ~typing.Sequence[TProtocol] = None, degree: int = 10, degree_low: int = 9, degree_high: int = 11, time_to_live: int = 30, gossip_window: int = 3, gossip_history: int = 5, heartbeat_interval: float = 0.5, heartbeat_initial_delay: float = 0.1, security_protocol: TProtocol = None, muxer_opt: ~typing.Mapping[TProtocol, ~typing.Type[~libp2p.stream_muxer.abc.IMuxedConn]] = None, msg_id_constructor: ~typing.Callable[[~libp2p.pubsub.pb.rpc_pb2.Message], bytes] = <function get_peer_and_seqno_msg_id>) AsyncIterator[Tuple[Pubsub, ...]]
host = <factory.declarations.SubFactory object>
router = None
strict_signing = False
class libp2p.tools.factories.RoutedHostFactory(**kwargs)

Bases: Factory

classmethod create_batch_and_listen(number: int, security_protocol: TProtocol = None, muxer_opt: Mapping[TProtocol, Type[IMuxedConn]] = None) AsyncIterator[Tuple[RoutedHost, ...]]
network = <factory.declarations.LazyAttribute object>
router = <factory.declarations.LazyFunction object>
class libp2p.tools.factories.SwarmFactory(**kwargs)

Bases: Factory

classmethod create_and_listen(key_pair: KeyPair = None, security_protocol: TProtocol = None, muxer_opt: Mapping[TProtocol, Type[IMuxedConn]] = None) AsyncIterator[Swarm]
classmethod create_batch_and_listen(number: int, security_protocol: TProtocol = None, muxer_opt: Mapping[TProtocol, Type[IMuxedConn]] = None) AsyncIterator[Tuple[Swarm, ...]]
peer_id = <factory.declarations.LazyAttribute object>
peerstore = <factory.declarations.LazyAttribute object>
transport = <factory.declarations.LazyFunction object>
upgrader = <factory.declarations.LazyAttribute object>
libp2p.tools.factories.default_key_pair_factory() KeyPair
libp2p.tools.factories.default_muxer_transport_factory() Mapping[TProtocol, Type[IMuxedConn]]
libp2p.tools.factories.host_pair_factory(security_protocol: TProtocol = None, muxer_opt: Mapping[TProtocol, Type[IMuxedConn]] = None) AsyncIterator[Tuple[BasicHost, BasicHost]]
libp2p.tools.factories.initialize_peerstore_with_our_keypair(self_id: ID, key_pair: KeyPair) PeerStore
libp2p.tools.factories.mplex_conn_pair_factory(security_protocol: TProtocol = None) AsyncIterator[Tuple[Mplex, Mplex]]
libp2p.tools.factories.mplex_stream_pair_factory(security_protocol: TProtocol = None) AsyncIterator[Tuple[MplexStream, MplexStream]]
libp2p.tools.factories.mplex_transport_factory() Mapping[TProtocol, Type[IMuxedConn]]
libp2p.tools.factories.net_stream_pair_factory(security_protocol: TProtocol = None, muxer_opt: Mapping[TProtocol, Type[IMuxedConn]] = None) AsyncIterator[Tuple[INetStream, INetStream]]
libp2p.tools.factories.noise_conn_factory(nursery: Nursery) AsyncIterator[Tuple[ISecureConn, ISecureConn]]
libp2p.tools.factories.noise_handshake_payload_factory() NoiseHandshakePayload
libp2p.tools.factories.noise_static_key_factory() PrivateKey
libp2p.tools.factories.noise_transport_factory(key_pair: KeyPair) ISecureTransport
libp2p.tools.factories.plaintext_transport_factory(key_pair: KeyPair) ISecureTransport
libp2p.tools.factories.raw_conn_factory(nursery: Nursery) AsyncIterator[Tuple[IRawConnection, IRawConnection]]
libp2p.tools.factories.secio_transport_factory(key_pair: KeyPair) ISecureTransport
libp2p.tools.factories.security_options_factory_factory(protocol_id: TProtocol | None = None) Callable[[KeyPair], Mapping[TProtocol, ISecureTransport]]
libp2p.tools.factories.swarm_conn_pair_factory(security_protocol: TProtocol = None, muxer_opt: Mapping[TProtocol, Type[IMuxedConn]] = None) AsyncIterator[Tuple[SwarmConn, SwarmConn]]
libp2p.tools.factories.swarm_pair_factory(security_protocol: TProtocol = None, muxer_opt: Mapping[TProtocol, Type[IMuxedConn]] = None) AsyncIterator[Tuple[Swarm, Swarm]]

libp2p.tools.utils module

async libp2p.tools.utils.connect(node1: IHost, node2: IHost) None

Connect node1 to node2.

async libp2p.tools.utils.connect_swarm(swarm_0: Swarm, swarm_1: Swarm) None
libp2p.tools.utils.create_echo_stream_handler(ack_prefix: str) Callable[[INetStream], Awaitable[None]]

Module contents