libp2p.transport.quic package

Connection ID Management

The QUIC transport implementation uses a sophisticated Connection ID (CID) management system inspired by the quinn Rust QUIC library. This system ensures proper packet routing and handles connection migration scenarios.

Key Features

Sequence Number Tracking

Each Connection ID is assigned a sequence number, starting at 0 for the initial CID. Sequence numbers are used to ensure proper retirement ordering per the QUIC specification.

Initial vs. Established CIDs

Initial CIDs (used during handshake) are tracked separately from established connection CIDs. This separation allows for efficient packet routing and proper handling of handshake packets.

Fallback Routing

When packets arrive with new Connection IDs before ConnectionIdIssued events are processed, the system uses O(1) fallback routing based on address mappings. This handles race conditions gracefully and ensures packets are routed correctly.

Retirement Ordering

Connection IDs are retired in sequence order, ensuring compliance with the QUIC specification. The ConnectionIDRegistry maintains sequence number mappings to enable proper retirement.

Architecture

The ConnectionIDRegistry class manages all Connection ID routing state:

  • Established connections: Maps Connection IDs to QUICConnection instances

  • Pending connections: Maps Connection IDs to QuicConnection (aioquic) instances during handshake

  • Initial CIDs: Separate tracking for handshake packet routing

  • Sequence tracking: Maps Connection IDs to sequence numbers and connections to sequence ranges

  • Address mappings: Bidirectional mappings between Connection IDs and addresses for O(1) fallback routing

Performance Monitoring

The registry tracks performance metrics including:

  • Fallback routing usage count

  • Sequence number distribution

  • Operation timings (when debug mode is enabled)

These metrics can be accessed via the get_stats() method and reset using reset_stats().

Submodules

libp2p.transport.quic.config module

Configuration classes for QUIC transport.

class libp2p.transport.quic.config.QUICStreamFlowControlConfig(initial_window_size: int = 524288, max_window_size: int = 2097152, window_update_threshold: float = 0.5, enable_auto_tuning: bool = True)

Bases: object

Configuration for QUIC stream flow control.

class libp2p.transport.quic.config.QUICTransportConfig(max_connections_per_peer: int = 3, connection_timeout: float = 10.0, load_balancing_strategy: str = 'round_robin', idle_timeout: float = 30.0, max_datagram_size: int = 1200, local_port: int | None = None, enable_draft29: bool = True, enable_v1: bool = True, verify_mode: ~ssl.VerifyMode = VerifyMode.CERT_NONE, alpn_protocols: list[str] = <factory>, max_concurrent_streams: int = 100, connection_window: int = 1048576, stream_window: int = 65536, enable_qlog: bool = False, qlog_dir: str | None = None, max_connections: int = 1000, MAX_CONCURRENT_STREAMS: int = 1000, MAX_INCOMING_STREAMS: int = 1000, CONNECTION_HANDSHAKE_TIMEOUT: float = 60.0, MAX_OUTGOING_STREAMS: int = 1000, CONNECTION_CLOSE_TIMEOUT: int = 10, STREAM_OPEN_TIMEOUT: float = 30.0, STREAM_ACCEPT_TIMEOUT: float = 30.0, STREAM_READ_TIMEOUT: float = 30.0, STREAM_WRITE_TIMEOUT: float = 30.0, STREAM_CLOSE_TIMEOUT: float = 10.0, NEGOTIATION_SEMAPHORE_LIMIT: int = <factory>, NEGOTIATE_TIMEOUT: float = 30.0, STREAM_FLOW_CONTROL_WINDOW: int = 1048576, CONNECTION_FLOW_CONTROL_WINDOW: int = 1572864, MAX_STREAM_RECEIVE_BUFFER: int = 2097152, STREAM_RECEIVE_BUFFER_LOW_WATERMARK: int = 65536, STREAM_RECEIVE_BUFFER_HIGH_WATERMARK: int = 524288, ENABLE_STREAM_RESET_ON_ERROR: bool = True, STREAM_RESET_ERROR_CODE: int = 1, ENABLE_STREAM_KEEP_ALIVE: bool = False, STREAM_KEEP_ALIVE_INTERVAL: float = 30.0, ENABLE_STREAM_RESOURCE_TRACKING: bool = True, STREAM_MEMORY_LIMIT_PER_STREAM: int = 2097152, STREAM_MEMORY_LIMIT_PER_CONNECTION: int = 104857600, ENABLE_STREAM_BATCHING: bool = True, STREAM_BATCH_SIZE: int = 10, STREAM_PROCESSING_CONCURRENCY: int = 100, ENABLE_STREAM_METRICS: bool = True, ENABLE_STREAM_TIMELINE_TRACKING: bool = True, STREAM_METRICS_COLLECTION_INTERVAL: float = 60.0, STREAM_ERROR_RETRY_ATTEMPTS: int = 3, STREAM_ERROR_RETRY_DELAY: float = 1.0, STREAM_ERROR_RETRY_BACKOFF_FACTOR: float = 2.0, PROTOCOL_QUIC_V1: ~libp2p.custom_types.TProtocol = 'quic-v1', PROTOCOL_QUIC_DRAFT29: ~libp2p.custom_types.TProtocol = 'quic')

Bases: ConnectionConfig

Configuration for QUIC transport.

CONNECTION_CLOSE_TIMEOUT: int = 10

Timeout for opening new connection (seconds).

CONNECTION_FLOW_CONTROL_WINDOW: int = 1572864

Connection-wide flow control window size.

CONNECTION_HANDSHAKE_TIMEOUT: float = 60.0

Timeout for connection handshake (seconds).

ENABLE_STREAM_BATCHING: bool = True

Whether to batch multiple stream operations.

ENABLE_STREAM_KEEP_ALIVE: bool = False

Whether to enable stream keep-alive mechanisms.

ENABLE_STREAM_METRICS: bool = True

Whether to collect stream metrics.

ENABLE_STREAM_RESET_ON_ERROR: bool = True

Whether to automatically reset streams on errors.

ENABLE_STREAM_RESOURCE_TRACKING: bool = True

Whether to track stream resource usage.

ENABLE_STREAM_TIMELINE_TRACKING: bool = True

Whether to track stream lifecycle timelines.

MAX_CONCURRENT_STREAMS: int = 1000

Maximum number of concurrent streams per connection.

MAX_INCOMING_STREAMS: int = 1000

Maximum number of incoming streams per connection.

MAX_OUTGOING_STREAMS: int = 1000

Maximum number of outgoing streams per connection.

MAX_STREAM_RECEIVE_BUFFER: int = 2097152

Maximum receive buffer size per stream.

NEGOTIATE_TIMEOUT: float = 30.0

Timeout for multiselect protocol negotiation (seconds).

This is the maximum time allowed for a single protocol negotiation to complete. Should be coordinated with NEGOTIATION_SEMAPHORE_LIMIT - with higher limits, negotiations may take longer due to contention. This value is used by BasicHost when negotiating protocols on QUIC streams.

NEGOTIATION_SEMAPHORE_LIMIT: int

Maximum concurrent multiselect negotiations per direction (client/server).

This limits the number of simultaneous protocol negotiations that can occur on a QUIC connection to prevent resource exhaustion and contention. Separate semaphores are used for client (outbound) and server (inbound) directions to prevent deadlocks. This value should be coordinated with BasicHost’s negotiate_timeout for optimal performance. Defaults are platform-aware: Windows caps at 16 to keep aioquic stable; Linux/macOS can safely run 32.

PROTOCOL_QUIC_DRAFT29: TProtocol = 'quic'
PROTOCOL_QUIC_V1: TProtocol = 'quic-v1'
STREAM_ACCEPT_TIMEOUT: float = 30.0

Timeout for accepting incoming streams (seconds).

STREAM_BATCH_SIZE: int = 10

Number of streams to process in a batch.

STREAM_CLOSE_TIMEOUT: float = 10.0

Timeout for graceful stream close (seconds).

STREAM_ERROR_RETRY_ATTEMPTS: int = 3

Number of retry attempts for recoverable stream errors.

STREAM_ERROR_RETRY_BACKOFF_FACTOR: float = 2.0

Backoff factor for stream error retries.

STREAM_ERROR_RETRY_DELAY: float = 1.0

Initial delay between stream error retries (seconds).

STREAM_FLOW_CONTROL_WINDOW: int = 1048576

Per-stream flow control window size.

STREAM_KEEP_ALIVE_INTERVAL: float = 30.0

Interval for stream keep-alive pings (seconds).

STREAM_MEMORY_LIMIT_PER_CONNECTION: int = 104857600

Total memory limit for all streams per connection.

STREAM_MEMORY_LIMIT_PER_STREAM: int = 2097152

Memory limit per individual stream.

STREAM_METRICS_COLLECTION_INTERVAL: float = 60.0

Interval for collecting stream metrics (seconds).

STREAM_OPEN_TIMEOUT: float = 30.0

Timeout for opening new streams (seconds).

Increased for high-concurrency scenarios.

STREAM_PROCESSING_CONCURRENCY: int = 100

Maximum concurrent stream processing tasks.

STREAM_READ_TIMEOUT: float = 30.0

Default timeout for stream read operations (seconds).

STREAM_RECEIVE_BUFFER_HIGH_WATERMARK: int = 524288

High watermark for stream receive buffer.

STREAM_RECEIVE_BUFFER_LOW_WATERMARK: int = 65536

Low watermark for stream receive buffer.

STREAM_RESET_ERROR_CODE: int = 1

Default error code for stream resets.

STREAM_WRITE_TIMEOUT: float = 30.0

Default timeout for stream write operations (seconds).

alpn_protocols: list[str]
connection_timeout: float = 10.0
connection_window: int = 1048576
enable_draft29: bool = True
enable_qlog: bool = False
enable_v1: bool = True
get_stream_config_dict() dict[str, Any]

Get stream-specific configuration as dictionary.

idle_timeout: float = 30.0
local_port: int | None = None
max_concurrent_streams: int = 100
max_connections: int = 1000
max_datagram_size: int = 1200
qlog_dir: str | None = None
stream_window: int = 65536
verify_mode: VerifyMode = 0
class libp2p.transport.quic.config.QUICTransportKwargs

Bases: TypedDict

Type definition for kwargs accepted by new_transport function.

PROTOCOL_QUIC_DRAFT29: TProtocol
PROTOCOL_QUIC_V1: TProtocol
alpn_protocols: list[str]
connection_timeout: float
connection_window: int
enable_draft29: bool
enable_qlog: bool
enable_v1: bool
idle_timeout: float
local_port: int | None
max_concurrent_streams: int
max_connections: int
max_datagram_size: int
qlog_dir: str | None
stream_window: int
verify_mode: VerifyMode
libp2p.transport.quic.config.create_stream_config_for_use_case(use_case: Literal['high_throughput', 'low_latency', 'many_streams', 'memory_constrained']) QUICTransportConfig

Create optimized stream configuration for specific use cases.

Args:
use_case: One of “high_throughput”, “low_latency”, “many_streams”,”

“memory_constrained”

Returns:

Optimized QUICTransportConfig

libp2p.transport.quic.connection module

QUIC Connection implementation. Manages bidirectional QUIC connections with integrated stream multiplexing.

class libp2p.transport.quic.connection.QUICConnection(quic_connection: QuicConnection, remote_addr: tuple[str, int], remote_peer_id: ID | None, local_peer_id: ID, is_initiator: bool, maddr: Multiaddr, transport: QUICTransport, security_manager: QUICTLSConfigManager | None = None, resource_scope: Any | None = None, listener_socket: SocketType | None = None, listener: QUICListener | None = None, listener_connection_id: bytes | None = None)

Bases: IRawConnection, IMuxedConn

QUIC connection implementing both raw connection and muxed connection interfaces.

Uses aioquic’s sans-IO core with trio for native async support. QUIC natively provides stream multiplexing, so this connection acts as both a raw connection (for transport layer) and muxed connection (for upper layers).

Features: - Native QUIC stream multiplexing - Integrated libp2p TLS security with peer identity verification - Resource-aware stream management - Comprehensive error handling - Flow control integration - Connection migration support - Performance monitoring - COMPLETE connection ID management (fixes the original issue)

async accept_stream(timeout: float | None = None) QUICStream

Accept incoming stream.

Args:

timeout: Optional timeout. If None, waits indefinitely.

async close() None

Connection close with proper stream cleanup.

async connect(nursery: Nursery) None

Establish the QUIC connection using trio nursery for background tasks.

Args:

nursery: Trio nursery for managing connection background tasks

get_active_streams() list[QUICStream]

Get list of active streams.

get_connection_id_stats() dict[str, Any]

Get connection ID statistics and current state.

get_current_connection_id() bytes | None

Get the current connection ID.

async get_peer_certificate() Certificate | None

Get the peer’s TLS certificate.

Returns:

The peer’s X.509 certificate, or None if not available

get_remote_address() tuple[str, int]

Return the remote address of the connected peer.

Returns:

A tuple of (host, port) or None if not available

get_security_info() dict[str, Any]

Get security-related information about the connection.

get_security_manager() QUICTLSConfigManager | None

Get the security manager for this connection.

get_stream_stats() dict[str, Any]

Get stream statistics for monitoring.

get_streams_by_protocol(protocol: str) list[QUICStream]

Get streams filtered by protocol.

property is_closed: bool

Check if connection is closed.

property is_established: bool

Check if connection is established (handshake completed).

property is_initiator: bool

Check if this connection is the initiator.

property is_peer_verified: bool

Check if peer identity has been verified.

property is_started: bool

Check if connection has been started.

local_peer_id() ID

Get the local peer ID.

multiaddr() Multiaddr

Get the multiaddr for this connection.

async open_stream(timeout: float | None = None) QUICStream

Open a new outbound stream

Args:
timeout: Timeout for stream creation

(defaults to STREAM_OPEN_TIMEOUT from config)

Returns:

New QUIC stream

Raises:

QUICStreamLimitError: Too many concurrent streams QUICConnectionClosedError: Connection is closed QUICStreamTimeoutError: Stream creation timed out

async read(n: int | None = -1) bytes

Read data from the stream.

Args:

n: Maximum number of bytes to read. -1 means read all available.

Returns:

Data bytes read from the stream.

Raises:

QUICStreamClosedError: If stream is closed for reading. QUICStreamResetError: If stream was reset. QUICStreamTimeoutError: If read timeout occurs.

remote_peer_id() ID | None

Get the remote peer ID.

set_listener_context(listener: QUICListener | None, listener_connection_id: bytes | None) None

Set owning listener context for O(1) Connection ID registration.

For server-side connections, the listener is responsible for packet routing.

set_resource_scope(scope: Any) None

Attach a resource scope to the connection for stream construction and cleanup.

set_stream_handler(handler_function: Callable[[QUICStream], Awaitable[None]]) None

Set handler for incoming streams.

Args:

handler_function: Function to handle new incoming streams

async start() None

Start the connection and its background tasks.

This method implements the IMuxedConn.start() interface. It should be called to begin processing connection events.

async write(data: bytes) None

Write data to the connection. For QUIC, this creates a new stream for each write operation.

libp2p.transport.quic.exceptions module

QUIC Transport exceptions

exception libp2p.transport.quic.exceptions.QUICAddressError(message: str, error_code: int | None = None)

Bases: QUICError

Base exception for QUIC addressing errors.

exception libp2p.transport.quic.exceptions.QUICAddressResolutionError(message: str, error_code: int | None = None)

Bases: QUICAddressError

Failed to resolve QUIC address.

exception libp2p.transport.quic.exceptions.QUICCertificateError(message: str, error_code: int | None = None)

Bases: QUICConfigurationError

Error with TLS certificate configuration.

exception libp2p.transport.quic.exceptions.QUICConfigurationError(message: str, error_code: int | None = None)

Bases: QUICError

Base exception for QUIC configuration errors.

exception libp2p.transport.quic.exceptions.QUICConnectionClosedError(message: str, error_code: int | None = None)

Bases: QUICConnectionError

QUIC connection has been closed.

exception libp2p.transport.quic.exceptions.QUICConnectionError(message: str, error_code: int | None = None)

Bases: QUICError

Base exception for QUIC connection operations.

exception libp2p.transport.quic.exceptions.QUICConnectionLimitError(message: str, error_code: int | None = None)

Bases: QUICResourceError

Connection limit exceeded.

exception libp2p.transport.quic.exceptions.QUICConnectionTimeoutError(message: str, error_code: int | None = None)

Bases: QUICConnectionError

QUIC connection operation timed out.

exception libp2p.transport.quic.exceptions.QUICDialError(message: str, error_code: int | None = None)

Bases: QUICTransportError

Error occurred during QUIC connection establishment.

exception libp2p.transport.quic.exceptions.QUICError(message: str, error_code: int | None = None)

Bases: Exception

Base exception for all QUIC transport errors.

class libp2p.transport.quic.exceptions.QUICErrorContext(operation: str, component: str = 'quic')

Bases: object

Context manager for handling QUIC errors with automatic error mapping. Useful for converting low-level aioquic errors to py-libp2p QUIC errors.

exception libp2p.transport.quic.exceptions.QUICFlowControlDeadlockError(message: str, error_code: int | None = None)

Bases: QUICFlowControlError

Flow control deadlock detected.

exception libp2p.transport.quic.exceptions.QUICFlowControlError(message: str, error_code: int | None = None)

Bases: QUICError

Base exception for flow control related errors.

exception libp2p.transport.quic.exceptions.QUICFlowControlViolationError(message: str, error_code: int | None = None)

Bases: QUICFlowControlError

Flow control limits were violated.

exception libp2p.transport.quic.exceptions.QUICHandshakeError(message: str, error_code: int | None = None)

Bases: QUICConnectionError

Error during QUIC handshake process.

exception libp2p.transport.quic.exceptions.QUICInvalidConfigError(message: str, error_code: int | None = None)

Bases: QUICConfigurationError

Invalid QUIC configuration parameters.

exception libp2p.transport.quic.exceptions.QUICInvalidMultiaddrError(message: str, error_code: int | None = None)

Bases: QUICAddressError

Invalid multiaddr format for QUIC transport.

exception libp2p.transport.quic.exceptions.QUICListenError(message: str, error_code: int | None = None)

Bases: QUICTransportError

Error occurred during QUIC listener operations.

exception libp2p.transport.quic.exceptions.QUICMemoryLimitError(message: str, error_code: int | None = None)

Bases: QUICResourceError

Memory limit exceeded.

exception libp2p.transport.quic.exceptions.QUICPeerVerificationError(message: str, error_code: int | None = None)

Bases: QUICConnectionError

Error verifying peer identity during handshake.

exception libp2p.transport.quic.exceptions.QUICProtocolError(message: str, error_code: int | None = None)

Bases: QUICError

Base exception for QUIC protocol errors.

exception libp2p.transport.quic.exceptions.QUICResourceError(message: str, error_code: int | None = None)

Bases: QUICError

Base exception for resource management errors.

exception libp2p.transport.quic.exceptions.QUICSecurityError(message: str, error_code: int | None = None)

Bases: QUICTransportError

Error related to QUIC security/TLS operations.

exception libp2p.transport.quic.exceptions.QUICStreamBackpressureError(message: str, stream_id: str | None = None, error_code: int | None = None)

Bases: QUICStreamError

Stream write blocked due to flow control.

exception libp2p.transport.quic.exceptions.QUICStreamClosedError(message: str, stream_id: str | None = None, error_code: int | None = None)

Bases: QUICStreamError

Stream is closed and cannot be used for I/O operations.

exception libp2p.transport.quic.exceptions.QUICStreamError(message: str, stream_id: str | None = None, error_code: int | None = None)

Bases: QUICError

Base exception for QUIC stream operations.

exception libp2p.transport.quic.exceptions.QUICStreamLimitError(message: str, stream_id: str | None = None, error_code: int | None = None)

Bases: QUICStreamError

Stream limit reached (too many concurrent streams).

exception libp2p.transport.quic.exceptions.QUICStreamResetError(message: str, stream_id: str | None = None, error_code: int | None = None, reset_by_peer: bool = False)

Bases: QUICStreamError

Stream was reset by local or remote peer.

exception libp2p.transport.quic.exceptions.QUICStreamStateError(message: str, stream_id: str | None = None, current_state: str | None = None, attempted_operation: str | None = None)

Bases: QUICStreamError

Invalid operation for current stream state.

exception libp2p.transport.quic.exceptions.QUICStreamTimeoutError(message: str, stream_id: str | None = None, error_code: int | None = None)

Bases: QUICStreamError

Stream operation timed out.

exception libp2p.transport.quic.exceptions.QUICTransportError(message: str, error_code: int | None = None)

Bases: QUICError

Base exception for QUIC transport operations.

exception libp2p.transport.quic.exceptions.QUICUnsupportedVersionError(message: str, error_code: int | None = None)

Bases: QUICProtocolError

Unsupported QUIC version.

exception libp2p.transport.quic.exceptions.QUICVersionNegotiationError(message: str, error_code: int | None = None)

Bases: QUICProtocolError

QUIC version negotiation failed.

libp2p.transport.quic.exceptions.create_connection_error(error_type: str, message: str, error_code: int | None = None) QUICConnectionError

Factory function to create appropriate connection error based on type.

Args:

error_type: Type of error (“closed”, “timeout”, “handshake”, etc.) message: Error message error_code: QUIC error code

Returns:

Appropriate QUICConnectionError subclass

libp2p.transport.quic.exceptions.create_stream_error(error_type: str, message: str, stream_id: str | None = None, error_code: int | None = None) QUICStreamError

Factory function to create appropriate stream error based on type.

Args:

error_type: Type of error (“closed”, “reset”, “timeout”, “backpressure”, etc.) message: Error message stream_id: Stream identifier error_code: QUIC error code

Returns:

Appropriate QUICStreamError subclass

libp2p.transport.quic.exceptions.map_quic_error_code(error_code: int) str

Map QUIC error codes to human-readable descriptions. Based on RFC 9000 Transport Error Codes.

libp2p.transport.quic.listener module

QUIC Listener

class libp2p.transport.quic.listener.QUICListener(transport: QUICTransport, handler_function: Callable[[object], Awaitable[None]], quic_configs: dict[TProtocol, QuicConfiguration], config: QUICTransportConfig, security_manager: QUICTLSConfigManager | None = None)

Bases: IListener

QUIC Listener with connection ID handling and protocol negotiation.

async close() None

Close the listener and clean up resources.

get_addresses() list[Multiaddr]

Get the bound addresses.

get_addrs() tuple[Multiaddr, ...]

Retrieve the list of addresses on which the listener is active.

Returns

tuple[Multiaddr, …]

A tuple of multiaddresses.

get_stats() dict[str, int | bool]

Get listener statistics including the listening state.

Returns:

dict: Statistics dictionary with current state information

is_listening() bool

Check if the listener is currently listening for connections.

Returns:

bool: True if the listener is actively listening, False otherwise

async listen(maddr: Multiaddr, nursery: Nursery) bool

Start listening on the given multiaddr with enhanced connection handling.

parse_quic_packet(data: bytes) QUICPacketInfo | None

Parse QUIC packet header to extract connection IDs and version. Based on RFC 9000 packet format.

class libp2p.transport.quic.listener.QUICPacketInfo(version: int, destination_cid: bytes, source_cid: bytes, packet_type: QuicPacketType, token: bytes | None = None)

Bases: object

Information extracted from a QUIC packet header.

libp2p.transport.quic.security module

QUIC Security helpers implementation

class libp2p.transport.quic.security.CertificateGenerator

Bases: object

Generates X.509 certificates with libp2p peer identity extensions. Follows libp2p TLS specification for QUIC transport.

generate_certificate(libp2p_private_key: PrivateKey, peer_id: ID, validity_days: int = 365) TLSConfig

Generate a TLS certificate with embedded libp2p peer identity. Fixed to use datetime objects for validity periods.

Args:

libp2p_private_key: The libp2p identity private key peer_id: The libp2p peer ID validity_days: Certificate validity period in days

Returns:

TLSConfig with certificate and private key

Raises:

QUICCertificateError: If certificate generation fails

class libp2p.transport.quic.security.LibP2PExtensionHandler

Bases: object

Handles libp2p-specific TLS extensions for peer identity verification.

Based on libp2p TLS specification: https://github.com/libp2p/specs/blob/master/tls/tls.md

static create_signed_key_extension(libp2p_private_key: PrivateKey, cert_public_key: bytes) bytes

Create the libp2p Public Key Extension with signed key proof.

The extension contains: 1. The libp2p public key 2. A signature proving ownership of the private key

Args:

libp2p_private_key: The libp2p identity private key cert_public_key: The certificate’s public key bytes

Returns:

Encoded extension value

static parse_signed_key_extension(extension: Extension[Any]) tuple[PublicKey, bytes]

Parse the libp2p Public Key Extension with support for all crypto types. Handles both ASN.1 DER format (from go-libp2p) and simple binary format.

class libp2p.transport.quic.security.LibP2PKeyConverter

Bases: object

Converts between libp2p key formats and cryptography library formats. Handles different key types: Ed25519, Secp256k1, RSA, ECDSA.

static deserialize_public_key(key_bytes: bytes) PublicKey

Deserialize libp2p public key from protobuf bytes.

Args:

key_bytes: Protobuf-serialized public key bytes

Returns:

Deserialized PublicKey instance

static libp2p_to_tls_private_key(libp2p_key: PrivateKey) EllipticCurvePrivateKey | RSAPrivateKey

Convert libp2p private key to TLS-compatible private key.

For certificate generation, we create a separate ephemeral key rather than using the libp2p identity key directly.

static serialize_public_key(public_key: PublicKey) bytes

Serialize libp2p public key to bytes.

class libp2p.transport.quic.security.PeerAuthenticator

Bases: object

Authenticates remote peers using libp2p TLS certificates. Validates both TLS certificate integrity and libp2p peer identity.

verify_peer_certificate(certificate: Certificate, expected_peer_id: ID | None = None) ID

Verify a peer’s TLS certificate and extract/validate peer identity.

Args:

certificate: The peer’s TLS certificate expected_peer_id: Expected peer ID (for outbound connections)

Returns:

The verified peer ID

Raises:

QUICPeerVerificationError: If verification fails

class libp2p.transport.quic.security.QUICTLSConfigManager(libp2p_private_key: PrivateKey, peer_id: ID)

Bases: object

Manages TLS configuration for QUIC transport with libp2p security. Integrates with aioquic’s TLS configuration system.

create_client_config() QUICTLSSecurityConfig

Create client configuration using the new class-based approach.

Returns:

QUICTLSSecurityConfig instance for client

create_server_config() QUICTLSSecurityConfig

Create server configuration using the new class-based approach.

Returns:

QUICTLSSecurityConfig instance for server

get_local_peer_id() ID

Get the local peer ID.

verify_peer_identity(peer_certificate: Certificate, expected_peer_id: ID | None = None) ID

Verify remote peer’s identity from their TLS certificate.

Args:

peer_certificate: Remote peer’s TLS certificate expected_peer_id: Expected peer ID (for outbound connections)

Returns:

Verified peer ID

class libp2p.transport.quic.security.QUICTLSSecurityConfig(certificate: ~cryptography.hazmat.bindings._rust.x509.Certificate, private_key: ~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey | ~cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey, certificate_chain: list[~cryptography.hazmat.bindings._rust.x509.Certificate] = <factory>, alpn_protocols: list[str] = <factory>, verify_mode: ~ssl.VerifyMode = VerifyMode.CERT_NONE, check_hostname: bool = False, request_client_certificate: bool = False, peer_id: ~libp2p.peer.id.ID | None = None, is_client_config: bool = False, config_name: str | None = None)

Bases: object

Type-safe TLS security configuration for QUIC transport.

alpn_protocols: list[str]
certificate: Certificate
certificate_chain: list[Certificate]
check_hostname: bool = False
config_name: str | None = None
debug_config() None

logger.debug debugging information about this configuration.

get_certificate_info() dict[Any, Any]

Get certificate information for debugging.

Returns:

Dictionary with certificate details

has_libp2p_extension() bool

Check if the certificate has the required libp2p extension.

Returns:

True if libp2p extension is present

is_certificate_valid() bool

Check if the certificate is currently valid (not expired).

Returns:

True if certificate is valid

is_client_config: bool = False
peer_id: ID | None = None
private_key: EllipticCurvePrivateKey | RSAPrivateKey
request_client_certificate: bool = False
validate_certificate_key_match() bool

Validate that the certificate and private key match.

Returns:

True if certificate and private key match

verify_mode: VerifyMode = 0
class libp2p.transport.quic.security.TLSConfig(certificate: Certificate, private_key: EllipticCurvePrivateKey | RSAPrivateKey, peer_id: ID)

Bases: object

TLS configuration for QUIC transport with libp2p extensions.

certificate: Certificate
get_certificate_der() bytes

Get certificate in DER format for external use.

get_certificate_pem() bytes

Get certificate in PEM format.

get_private_key_der() bytes

Get private key in DER format for external use.

get_private_key_pem() bytes

Get private key in PEM format.

peer_id: ID
private_key: EllipticCurvePrivateKey | RSAPrivateKey
libp2p.transport.quic.security.create_client_tls_config(certificate: Certificate, private_key: EllipticCurvePrivateKey | RSAPrivateKey, peer_id: ID | None = None, **kwargs: Any) QUICTLSSecurityConfig

Create a client TLS configuration.

Args:

certificate: X.509 certificate private_key: Private key corresponding to certificate peer_id: Optional peer ID for validation kwargs: Additional configuration parameters

Returns:

Client TLS configuration

libp2p.transport.quic.security.create_quic_security_transport(libp2p_private_key: PrivateKey, peer_id: ID) QUICTLSConfigManager

Factory function to create QUIC security transport.

Args:

libp2p_private_key: The libp2p identity private key peer_id: The libp2p peer ID

Returns:

Configured QUIC TLS manager

libp2p.transport.quic.security.create_server_tls_config(certificate: Certificate, private_key: EllipticCurvePrivateKey | RSAPrivateKey, peer_id: ID | None = None, **kwargs: Any) QUICTLSSecurityConfig

Create a server TLS configuration.

Args:

certificate: X.509 certificate private_key: Private key corresponding to certificate peer_id: Optional peer ID for validation kwargs: Additional configuration parameters

Returns:

Server TLS configuration

libp2p.transport.quic.stream module

QUIC Stream implementation Provides stream interface over QUIC’s native multiplexing.

class libp2p.transport.quic.stream.QUICStream(connection: QUICConnection, stream_id: int, direction: StreamDirection, remote_addr: tuple[str, int], resource_scope: Any | None = None)

Bases: object

QUIC Stream implementation following libp2p IMuxedStream interface.

Based on patterns from go-libp2p and js-libp2p, this implementation: - Leverages QUIC’s native multiplexing and flow control - Integrates with libp2p resource management - Provides comprehensive error handling with QUIC-specific codes - Supports bidirectional communication with independent close semantics - Implements proper stream lifecycle management

can_read() bool

Check if stream can be read from.

can_write() bool

Check if stream can be written to.

async close() None

Close the stream gracefully (both read and write sides).

This implements proper close semantics where both sides are closed and resources are cleaned up.

async close_read() None

Close the read side of the stream.

async close_write() None

Close the write side of the stream.

property direction: StreamDirection

Get stream direction.

get_remote_address() tuple[str, int]
async handle_data_received(data: bytes, end_stream: bool) None

Handle data received from the QUIC connection.

Args:

data: Received data end_stream: Whether this is the last data (FIN received)

async handle_flow_control_update(available_window: int) None

Handle flow control window updates.

Args:

available_window: Available flow control window size

async handle_reset(error_code: int) None

Handle stream reset from remote peer.

Args:

error_code: QUIC error code from reset frame

async handle_stop_sending(error_code: int) None

Handle STOP_SENDING frame from remote peer.

When a STOP_SENDING frame is received, the peer is requesting that we stop sending data on this stream. We respond by resetting the stream.

Args:

error_code: Error code from the STOP_SENDING frame

is_closed() bool

Check if stream is completely closed.

property is_initiator: bool

Check if this stream was locally initiated.

is_reset() bool

Check if stream was reset.

property muxed_conn: QUICConnection

Get the parent muxed connection.

property protocol: object | None

Get the protocol identifier for this stream.

async read(n: int | None = None) bytes

Read data from the stream with QUIC flow control.

Args:

n: Maximum number of bytes to read. If None or -1, read all available.

Returns:

Data read from stream

Raises:

QUICStreamClosedError: Stream is closed QUICStreamResetError: Stream was reset QUICStreamTimeoutError: Read timeout exceeded

async reset(error_code: int = 0) None

Reset the stream with the given error code.

Args:

error_code: QUIC error code for the reset

set_deadline(ttl: int) bool

Set a deadline for the stream. QUIC does not support deadlines natively, so this method always returns False to indicate the operation is unsupported.

Parameters:

ttl – Time-to-live in seconds (ignored).

Returns:

False, as deadlines are not supported.

property state: StreamState

Get current stream state.

property stream_id: str

Get stream ID as string for libp2p compatibility.

async wait_ready_for_io(timeout: float = 1.0) None

Wait for stream to be ready for I/O operations.

This ensures the stream and its parent connection are ready before attempting to read/write. For outbound streams, this ensures the connection is established and the stream can write.

Args:

timeout: Maximum time to wait in seconds

Raises:

QUICStreamTimeoutError: If stream is not ready within timeout

async write(data: bytes) None

Write data to the stream with QUIC flow control.

Args:

data: Data to write

Raises:

QUICStreamClosedError: Stream is closed for writing QUICStreamBackpressureError: Flow control window exhausted QUICStreamResetError: Stream was reset

class libp2p.transport.quic.stream.StreamDirection(value)

Bases: Enum

Stream direction for tracking initiator.

INBOUND = 'inbound'
OUTBOUND = 'outbound'
class libp2p.transport.quic.stream.StreamState(value)

Bases: Enum

Stream lifecycle states following libp2p patterns.

CLOSED = 'closed'
OPEN = 'open'
READ_CLOSED = 'read_closed'
RESET = 'reset'
WRITE_CLOSED = 'write_closed'
class libp2p.transport.quic.stream.StreamTimeline

Bases: object

Track stream lifecycle events for debugging and monitoring.

record_close() None
record_first_data() None
record_open() None
record_reset(error_code: int) None

libp2p.transport.quic.transport module

QUIC Transport implementation

class libp2p.transport.quic.transport.QUICTransport(private_key: PrivateKey, config: QUICTransportConfig | None = None)

Bases: ITransport

QUIC Stream implementation following libp2p IMuxedStream interface.

can_dial(maddr: Multiaddr) bool

Check if this transport can dial the given multiaddr.

Args:

maddr: Multiaddr to check

Returns:

True if this transport can dial the address

async close() None

Close the transport and cleanup resources.

create_listener(handler_function: Callable[[object], Awaitable[None]]) QUICListener

Create a QUIC listener with integrated security.

Args:

handler_function: Function to handle new connections

Returns:

QUIC listener instance

Raises:

QUICListenError: If transport is closed

async dial(maddr: Multiaddr) QUICConnection

Dial a remote peer using QUIC transport with security verification.

Args:

maddr: Multiaddr of the remote peer (e.g., /ip4/1.2.3.4/udp/4001/quic-v1) peer_id: Expected peer ID for verification nursery: Nursery to execute the background tasks

Returns:

Raw connection interface to the remote peer

Raises:

QUICDialError: If dialing fails QUICSecurityError: If security verification fails

get_listener_socket() SocketType | None

Get the socket from the first active listener.

get_security_manager() QUICTLSConfigManager

Get the security manager for this transport.

Returns:

The QUIC TLS configuration manager

get_stats() dict[str, int | list[str] | object]

Get transport statistics including security info.

listen_order() int

Get the listen order priority for this transport. Matches go-libp2p’s ListenOrder = 1 for QUIC.

Returns:

Priority order for listening (lower = higher priority)

protocols() list[TProtocol]

Get supported protocol identifiers.

Returns:

List of supported protocol strings

set_background_nursery(nursery: Nursery) None

Set the nursery to use for background tasks (called by swarm).

set_swarm(swarm: object) None

Set the swarm for adding incoming connections.

libp2p.transport.quic.utils module

Multiaddr utilities for QUIC transport - Module 4. Essential utilities required for QUIC transport implementation. Based on go-libp2p and js-libp2p QUIC implementations.

libp2p.transport.quic.utils.create_client_config_from_base(base_config: QuicConfiguration, security_manager: QUICTLSConfigManager | None = None, transport_config: QUICTransportConfig | None = None) QuicConfiguration

Create a client configuration without using deepcopy.

libp2p.transport.quic.utils.create_quic_multiaddr(host: str, port: int, version: str = 'quic-v1') Multiaddr

Create a QUIC multiaddr from host, port, and version.

Args:

host: IP address (IPv4 or IPv6) port: UDP port number version: QUIC version (“quic-v1” or “quic”)

Returns:

QUIC multiaddr

Raises:

QUICInvalidMultiaddrError: If invalid parameters provided

libp2p.transport.quic.utils.create_server_config_from_base(base_config: QuicConfiguration, security_manager: QUICTLSConfigManager | None = None, transport_config: QUICTransportConfig | None = None) QuicConfiguration

Create a server configuration without using deepcopy. Manually copies attributes while handling cryptography objects properly.

libp2p.transport.quic.utils.custom_quic_version_to_wire_format(version: TProtocol) int

Convert QUIC version string to wire format integer for aioquic.

Args:

version: QUIC version string (“quic-v1” or “quic”)

Returns:

Wire format version number

Raises:

QUICUnsupportedVersionError: If version is not supported

libp2p.transport.quic.utils.get_alpn_protocols() list[str]

Get ALPN protocols for libp2p over QUIC.

Returns:

List of ALPN protocol identifiers

libp2p.transport.quic.utils.is_quic_multiaddr(maddr: Multiaddr) bool

Check if a multiaddr represents a QUIC address.

Valid QUIC multiaddrs: - /ip4/127.0.0.1/udp/4001/quic-v1 - /ip4/127.0.0.1/udp/4001/quic - /ip6/::1/udp/4001/quic-v1 - /ip6/::1/udp/4001/quic

Args:

maddr: Multiaddr to check

Returns:

True if the multiaddr represents a QUIC address

libp2p.transport.quic.utils.multiaddr_to_quic_version(maddr: Multiaddr) TProtocol

Determine QUIC version from multiaddr.

Args:

maddr: QUIC multiaddr

Returns:

QUIC version identifier (“quic-v1” or “quic”)

Raises:

QUICInvalidMultiaddrError: If multiaddr doesn’t contain QUIC protocol

libp2p.transport.quic.utils.normalize_quic_multiaddr(maddr: Multiaddr) Multiaddr

Normalize a QUIC multiaddr to canonical form.

Args:

maddr: Input QUIC multiaddr

Returns:

Normalized multiaddr

Raises:

QUICInvalidMultiaddrError: If not a valid QUIC multiaddr

libp2p.transport.quic.utils.quic_multiaddr_to_endpoint(maddr: Multiaddr) tuple[str, int]

Extract host and port from a QUIC multiaddr.

Args:

maddr: QUIC multiaddr

Returns:

Tuple of (host, port)

Raises:

QUICInvalidMultiaddrError: If multiaddr is not a valid QUIC address

libp2p.transport.quic.utils.quic_version_to_wire_format(version: TProtocol) int

Convert QUIC version string to wire format integer for aioquic.

Args:

version: QUIC version string (“quic-v1” or “quic”)

Returns:

Wire format version number

Raises:

QUICUnsupportedVersionError: If version is not supported

Module contents