libp2p.network.stream package

Submodules

libp2p.network.stream.exceptions module

exception libp2p.network.stream.exceptions.StreamClosed

Bases: StreamError

exception libp2p.network.stream.exceptions.StreamEOF

Bases: StreamError, EOFError

exception libp2p.network.stream.exceptions.StreamError

Bases: IOException

exception libp2p.network.stream.exceptions.StreamReset

Bases: StreamError

libp2p.network.stream.net_stream module

class libp2p.network.stream.net_stream.NetStream(muxed_stream: IMuxedStream, swarm_conn: SwarmConn | None)

Bases: INetStream

A Network stream implementation with comprehensive state management.

NetStream wraps a muxed stream and provides proper state tracking, resource cleanup, and event notification capabilities with thread-safe operations.

State Machine

[INIT] → OPEN → CLOSE_READ → CLOSE_BOTH → [CLEANUP]
              ↓         ↗           ↗
         CLOSE_WRITE → ←          ↗
              ↓                   ↗
            RESET ────────────────┘
              ↓
            ERROR ────────────────┘

State Transitions

  • INIT → OPEN: Stream establishment

  • OPEN → CLOSE_READ: EOF encountered during read() or explicit close_read()

  • OPEN → CLOSE_WRITE: Explicit close_write() call

  • OPEN → RESET: reset() call or critical stream error

  • OPEN → ERROR: Unexpected error during I/O operations

  • CLOSE_READ → CLOSE_BOTH: Explicit close_write() call

  • CLOSE_WRITE → CLOSE_BOTH: EOF encountered during read()

  • Any state → ERROR: Unrecoverable error condition

  • Any state → RESET: reset() call (for cleanup)

Terminal States: RESET and ERROR are terminal - no further transitions

Stream States

INIT: Stream is created but not yet established OPEN: Stream is established and ready for I/O operations CLOSE_READ: Read side is closed, write side may still be open CLOSE_WRITE: Write side is closed, read side may still be open CLOSE_BOTH: Both sides are closed, stream is terminated RESET: Stream was reset by remote peer or locally ERROR: Stream encountered an unrecoverable error

Operation Validity by State

OPEN: read() ✓ write() ✓ close_read() ✓ close_write() ✓ reset() ✓ CLOSE_READ: read() ✗ write() ✓ close_read() ✓ close_write() ✓ reset() ✓ CLOSE_WRITE: read() ✓ write() ✗ close_read() ✓ close_write() ✓ reset() ✓ CLOSE_BOTH: read() ✗ write() ✗ close_read() ✓ close_write() ✓ reset() ✓ RESET: read() ✗ write() ✗ close_read() ✗ close_write() ✗ reset() ✓ ERROR: read() ✗ write() ✗ close_read() ✗ close_write() ✗ reset() ✓

Thread Safety

All state operations are protected by trio.Lock() for safe concurrent access. State checks and modifications are atomic operations preventing race conditions.

QUIC Compatibility

Half-closed states (CLOSE_READ, CLOSE_WRITE) are essential for QUIC transport where streams can independently close read or write sides.

Error Handling

ERROR state is triggered by unexpected exceptions during I/O operations. Known exceptions (EOF, Reset, etc.) are handled gracefully without ERROR state. Recovery from ERROR state is possible but not guaranteed.

param muxed_stream:

The underlying muxed stream

param swarm_conn:

Optional swarm connection for stream management

async close() None

Close stream completely and clean up resources.

async close_read() None

Close the stream for reading only.

async close_write() None

Close the stream for writing only.

get_protocol() TProtocol | None
Returns:

protocol id that stream runs on

get_remote_address() tuple[str, int] | None

Delegate to the underlying muxed stream.

async is_operational() bool

Check if stream is in an operational state.

Returns:

True if stream can perform I/O operations

muxed_stream: IMuxedStream
protocol_id: TProtocol | None
async read(n: int | None = None) bytes

Read from stream.

Parameters:

n – number of bytes to read

Returns:

Bytes read from the stream

async remove() None

Remove the stream from the connection and notify swarm that stream was closed.

async reset() None

Reset stream.

set_protocol(protocol_id: TProtocol) None
Parameters:

protocol_id – protocol id that stream runs on

async set_state(state: StreamState) None

Set the current state of the stream.

Parameters:

state – new state of the stream

property state: StreamState
Returns:

current state of the stream

async write(data: bytes) None

Write to stream.

Parameters:

data – bytes to write

class libp2p.network.stream.net_stream.StreamState(value)

Bases: Enum

An enumeration.

CLOSE_BOTH = 5
CLOSE_READ = 3
CLOSE_WRITE = 4
ERROR = 7
INIT = 1
OPEN = 2
RESET = 6

Module contents