libp2p.tools package
Subpackages
- libp2p.tools.pubsub package
- Submodules
- libp2p.tools.pubsub.dummy_account_node module
DummyAccountNode
DummyAccountNode.create()
DummyAccountNode.get_balance()
DummyAccountNode.handle_incoming_msgs()
DummyAccountNode.handle_send_crypto()
DummyAccountNode.handle_set_crypto()
DummyAccountNode.host
DummyAccountNode.publish_send_crypto()
DummyAccountNode.publish_set_crypto()
DummyAccountNode.pubsub
DummyAccountNode.run()
- libp2p.tools.pubsub.floodsub_integration_test_settings module
- libp2p.tools.pubsub.utils module
- Module contents
Submodules
libp2p.tools.constants module
- class libp2p.tools.constants.GossipsubParams(degree, degree_low, degree_high, time_to_live, gossip_window, gossip_history, heartbeat_initial_delay, heartbeat_interval)
Bases:
NamedTuple
libp2p.tools.factories module
- class libp2p.tools.factories.DummyRouter
Bases:
IPeerRouting
- class libp2p.tools.factories.FloodsubFactory(**kwargs)
Bases:
Factory
- protocols = ('/floodsub/1.0.0',)
- class libp2p.tools.factories.GossipsubFactory(**kwargs)
Bases:
Factory
- degree = 10
- degree_high = 11
- degree_low = 9
- gossip_history = 5
- gossip_window = 3
- heartbeat_initial_delay = 0.1
- heartbeat_interval = 0.5
- protocols = ('/meshsub/1.0.0',)
- time_to_live = 30
- class libp2p.tools.factories.HostFactory(**kwargs)
Bases:
Factory
- classmethod create_batch_and_listen(number: int, security_protocol: TProtocol = None, muxer_opt: Mapping[TProtocol, Type[IMuxedConn]] = None) AsyncIterator[Tuple[BasicHost, ...]]
- network = <factory.declarations.LazyAttribute object>
- class libp2p.tools.factories.IDFactory(**kwargs)
Bases:
Factory
- peer_id_bytes = <factory.declarations.LazyFunction object>
- class libp2p.tools.factories.PubsubFactory(**kwargs)
Bases:
Factory
- cache_size = None
- classmethod create_and_start(host: IHost, router: IPubsubRouter, cache_size: int, strict_signing: bool, msg_id_constructor: Callable[[Message], bytes] = None) AsyncIterator[Pubsub]
- classmethod create_batch_with_floodsub(number: int, cache_size: int = None, strict_signing: bool = False, protocols: ~typing.Sequence[TProtocol] = None, security_protocol: TProtocol = None, muxer_opt: ~typing.Mapping[TProtocol, ~typing.Type[~libp2p.stream_muxer.abc.IMuxedConn]] = None, msg_id_constructor: ~typing.Callable[[~libp2p.pubsub.pb.rpc_pb2.Message], bytes] = <function get_peer_and_seqno_msg_id>) AsyncIterator[Tuple[Pubsub, ...]]
- classmethod create_batch_with_gossipsub(number: int, *, cache_size: int = None, strict_signing: bool = False, protocols: ~typing.Sequence[TProtocol] = None, degree: int = 10, degree_low: int = 9, degree_high: int = 11, time_to_live: int = 30, gossip_window: int = 3, gossip_history: int = 5, heartbeat_interval: float = 0.5, heartbeat_initial_delay: float = 0.1, security_protocol: TProtocol = None, muxer_opt: ~typing.Mapping[TProtocol, ~typing.Type[~libp2p.stream_muxer.abc.IMuxedConn]] = None, msg_id_constructor: ~typing.Callable[[~libp2p.pubsub.pb.rpc_pb2.Message], bytes] = <function get_peer_and_seqno_msg_id>) AsyncIterator[Tuple[Pubsub, ...]]
- host = <factory.declarations.SubFactory object>
- router = None
- strict_signing = False
- class libp2p.tools.factories.RoutedHostFactory(**kwargs)
Bases:
Factory
- classmethod create_batch_and_listen(number: int, security_protocol: TProtocol = None, muxer_opt: Mapping[TProtocol, Type[IMuxedConn]] = None) AsyncIterator[Tuple[RoutedHost, ...]]
- network = <factory.declarations.LazyAttribute object>
- router = <factory.declarations.LazyFunction object>
- class libp2p.tools.factories.SwarmFactory(**kwargs)
Bases:
Factory
- classmethod create_and_listen(key_pair: KeyPair = None, security_protocol: TProtocol = None, muxer_opt: Mapping[TProtocol, Type[IMuxedConn]] = None) AsyncIterator[Swarm]
- classmethod create_batch_and_listen(number: int, security_protocol: TProtocol = None, muxer_opt: Mapping[TProtocol, Type[IMuxedConn]] = None) AsyncIterator[Tuple[Swarm, ...]]
- peer_id = <factory.declarations.LazyAttribute object>
- peerstore = <factory.declarations.LazyAttribute object>
- transport = <factory.declarations.LazyFunction object>
- upgrader = <factory.declarations.LazyAttribute object>
- libp2p.tools.factories.default_muxer_transport_factory() Mapping[TProtocol, Type[IMuxedConn]]
- libp2p.tools.factories.host_pair_factory(security_protocol: TProtocol = None, muxer_opt: Mapping[TProtocol, Type[IMuxedConn]] = None) AsyncIterator[Tuple[BasicHost, BasicHost]]
- libp2p.tools.factories.initialize_peerstore_with_our_keypair(self_id: ID, key_pair: KeyPair) PeerStore
- libp2p.tools.factories.mplex_conn_pair_factory(security_protocol: TProtocol = None) AsyncIterator[Tuple[Mplex, Mplex]]
- libp2p.tools.factories.mplex_stream_pair_factory(security_protocol: TProtocol = None) AsyncIterator[Tuple[MplexStream, MplexStream]]
- libp2p.tools.factories.mplex_transport_factory() Mapping[TProtocol, Type[IMuxedConn]]
- libp2p.tools.factories.net_stream_pair_factory(security_protocol: TProtocol = None, muxer_opt: Mapping[TProtocol, Type[IMuxedConn]] = None) AsyncIterator[Tuple[INetStream, INetStream]]
- libp2p.tools.factories.noise_conn_factory(nursery: Nursery) AsyncIterator[Tuple[ISecureConn, ISecureConn]]
- libp2p.tools.factories.noise_handshake_payload_factory() NoiseHandshakePayload
- libp2p.tools.factories.noise_static_key_factory() PrivateKey
- libp2p.tools.factories.noise_transport_factory(key_pair: KeyPair) ISecureTransport
- libp2p.tools.factories.plaintext_transport_factory(key_pair: KeyPair) ISecureTransport
- libp2p.tools.factories.raw_conn_factory(nursery: Nursery) AsyncIterator[Tuple[IRawConnection, IRawConnection]]
- libp2p.tools.factories.secio_transport_factory(key_pair: KeyPair) ISecureTransport
- libp2p.tools.factories.security_options_factory_factory(protocol_id: TProtocol | None = None) Callable[[KeyPair], Mapping[TProtocol, ISecureTransport]]
- libp2p.tools.factories.swarm_conn_pair_factory(security_protocol: TProtocol = None, muxer_opt: Mapping[TProtocol, Type[IMuxedConn]] = None) AsyncIterator[Tuple[SwarmConn, SwarmConn]]
- libp2p.tools.factories.swarm_pair_factory(security_protocol: TProtocol = None, muxer_opt: Mapping[TProtocol, Type[IMuxedConn]] = None) AsyncIterator[Tuple[Swarm, Swarm]]