libp2p.stream_muxer.mplex package

Submodules

libp2p.stream_muxer.mplex.constants module

class libp2p.stream_muxer.mplex.constants.HeaderTags(value)

Bases: Enum

An enumeration.

CloseInitiator = 4
CloseReceiver = 3
MessageInitiator = 2
MessageReceiver = 1
NewStream = 0
ResetInitiator = 6
ResetReceiver = 5

libp2p.stream_muxer.mplex.datastructures module

class libp2p.stream_muxer.mplex.datastructures.StreamID(channel_id, is_initiator)

Bases: NamedTuple

channel_id: int

Alias for field number 0

is_initiator: bool

Alias for field number 1

libp2p.stream_muxer.mplex.exceptions module

exception libp2p.stream_muxer.mplex.exceptions.MplexError

Bases: MuxedConnError

exception libp2p.stream_muxer.mplex.exceptions.MplexStreamClosed

Bases: MuxedStreamClosed

exception libp2p.stream_muxer.mplex.exceptions.MplexStreamEOF

Bases: MuxedStreamEOF

exception libp2p.stream_muxer.mplex.exceptions.MplexStreamReset

Bases: MuxedStreamReset

exception libp2p.stream_muxer.mplex.exceptions.MplexUnavailable

Bases: MuxedConnUnavailable

libp2p.stream_muxer.mplex.mplex module

class libp2p.stream_muxer.mplex.mplex.Mplex(secured_conn: ISecureConn, peer_id: ID, on_close: Callable[[], Awaitable[Any]] | None = None)

Bases: IMuxedConn

reference: https://github.com/libp2p/go-mplex/blob/master/multiplex.go

async accept_stream() IMuxedStream

Accept a muxed stream opened by the other end.

async close() None

Close the stream muxer and underlying secured connection.

event_closed: Event
event_shutting_down: Event
event_started: Event
get_remote_address() tuple[str, int] | None

Delegate to the underlying Mplex connection’s secured_conn.

async handle_incoming() None

Read a message off of the secured connection and add it to the corresponding message buffer.

property is_closed: bool

Check connection is fully closed.

Returns:

true if successful

property is_initiator: bool

Determine if this connection is the initiator.

Returns:

True if this connection initiated the connection, otherwise False.

new_stream_receive_channel: MemoryReceiveChannel[IMuxedStream]
new_stream_send_channel: MemorySendChannel[IMuxedStream]
next_channel_id: int
on_close: Callable[[], Awaitable[Any]] | None
async open_stream() IMuxedStream

Create a new muxed_stream.

Returns:

a new MplexStream

peer_id: ID
async read_message() tuple[int, int, bytes]

Read a single message off of the secured connection.

Returns:

stream_id, flag, message contents

secured_conn: ISecureConn
async send_message(flag: HeaderTags, data: bytes | None, stream_id: StreamID) int

Send a message over the connection.

Parameters:
  • flag – header to use

  • data – data to send in the message

  • stream_id – stream the message is in

async start() None

Start the multiplexer.

streams: dict[StreamID, MplexStream]
streams_lock: Lock
streams_msg_channels: dict[StreamID, trio.MemorySendChannel[bytes]]
async write_to_stream(_bytes: bytes) None

Write a byte array to a secured connection.

Parameters:

_bytes – byte array to write

Returns:

length written

libp2p.stream_muxer.mplex.mplex_stream module

class libp2p.stream_muxer.mplex.mplex_stream.MplexStream(name: str, stream_id: StreamID, muxed_conn: Mplex, incoming_data_channel: trio.MemoryReceiveChannel[bytes])

Bases: IMuxedStream

reference: https://github.com/libp2p/go-mplex/blob/master/stream.go

async close() None

Closing a stream closes it for writing and closes the remote end for reading but allows writing in the other direction.

close_lock: Lock
event_local_closed: Event
event_remote_closed: Event
event_reset: Event
get_remote_address() tuple[str, int] | None

Delegate to the parent Mplex connection.

incoming_data_channel: trio.MemoryReceiveChannel[bytes]
property is_initiator: bool
muxed_conn: Mplex
name: str
async read(n: int | None = None) bytes

Read up to n bytes. Read possibly returns fewer than n bytes, if there are not enough bytes in the Mplex buffer. If n is None, read until EOF.

Parameters:

n – number of bytes to read

Returns:

bytes actually read

Raises:
read_deadline: int | None
async reset() None

Close both ends of the stream tells this remote side to hang up.

rw_lock: ReadWriteLock
set_deadline(ttl: int) bool

Set deadline for both read and write operations on the muxed stream.

The deadline is enforced for the entire operation including lock acquisition. If the operation takes longer than the specified timeout, a TimeoutError is raised.

Parameters:

ttl – timeout in seconds for read and write operations

Returns:

True if successful, False if ttl is invalid (negative)

set_read_deadline(ttl: int) bool

Set read deadline for muxed stream.

The deadline is enforced for the entire read operation including lock acquisition. If the read operation takes longer than the specified timeout, a TimeoutError is raised.

Parameters:

ttl – timeout in seconds for read operations

Returns:

True if successful, False if ttl is invalid (negative)

set_write_deadline(ttl: int) bool

Set write deadline for muxed stream.

The deadline is enforced for the entire write operation including lock acquisition. If the write operation takes longer than the specified timeout, a TimeoutError is raised.

Parameters:

ttl – timeout in seconds for write operations

Returns:

True if successful, False if ttl is invalid (negative)

stream_id: StreamID
async write(data: bytes) None

Write to stream.

Parameters:

data – bytes to write

Raises:
write_deadline: int | None

Module contents