libp2p.transport.websocket package

Submodules

libp2p.transport.websocket.connection module

class libp2p.transport.websocket.connection.P2PWebSocketConnection(ws_connection: WebSocketConnection | Any, local_addr: Multiaddr | None = None, remote_addr: Multiaddr | None = None, ssl_context: SSLContext | None = None, max_message_size: int = 1048576, keepalive_interval: float = 30.0, handshake_timeout: float = 10.0, max_buffer: int = 4194304, is_secure: bool = False, max_buffered_amount: int = 8388608)

Bases: ReadWriteCloser

Production-ready WebSocket connection implementation with enhanced features: - Comprehensive error handling and custom exceptions - Connection statistics and activity tracking - Secure connection support (WSS) - Message validation and size limits - Automatic ping/pong handling - Browser compatibility optimizations - Activity monitoring - Secure connection support

  • Flow control and buffer management

  • Error handling and recovery

  • Connection state monitoring

  • Performance statistics

  • Ping/pong keepalive

  • Graceful shutdown

async close() None

Close the WebSocket connection. This method is idempotent.

Raises:

WebSocketConnectionError – If closing the connection fails

conn_state() dict[str, Any]

Return connection state information similar to Go’s ConnState() method.

Returns:

Dictionary containing connection state information

get_remote_address() tuple[str, int] | None

Get remote address from the WebSocket connection.

get_stats() dict[str, int | float]

Get connection statistics.

is_closed() bool

Check if the connection is closed

async read(n: int | None = None) bytes

Read data from the WebSocket connection.

This method blocks until data is available, similar to TCP streams. This is required for multistream negotiation which expects blocking reads.

When the WebSocket connection is closed by the peer, this method raises IOException with detailed information including the close code and reason, rather than returning empty bytes. This allows higher-level code (such as read_exactly()) to immediately detect connection closure and provide better error messages.

Args:

n: Number of bytes to read (None for all available)

Returns:

bytes: Received data

Raises:
  • IOException – If connection is closed or read fails. When the connection is closed by the peer, the exception includes the WebSocket close code and reason for better debugging.

  • WebSocketConnectionError – If WebSocket connection error occurs

  • WebSocketMessageError – If message processing fails

async write(data: bytes) None

Write data to the WebSocket connection.

Args:

data: The bytes to write

Raises:
exception libp2p.transport.websocket.connection.WebSocketConnectionError

Bases: IOException

Base class for WebSocket connection errors.

exception libp2p.transport.websocket.connection.WebSocketHandshakeError

Bases: WebSocketConnectionError

Error during WebSocket handshake.

exception libp2p.transport.websocket.connection.WebSocketMessageError

Bases: WebSocketConnectionError

Error processing WebSocket message.

exception libp2p.transport.websocket.connection.WebSocketProtocolError

Bases: WebSocketConnectionError

WebSocket protocol error.

class libp2p.transport.websocket.connection.WebSocketStats(created_at: ~datetime.datetime = <factory>, bytes_sent: int = 0, bytes_received: int = 0, messages_sent: int = 0, messages_received: int = 0, errors: int = 0, last_error: str | None = None, last_activity: ~datetime.datetime | None = None, protocol: str | None = None, is_secure: bool = False, connected_at: float = 1771284566.6868134, ping_rtt_ms: float = 0.0)

Bases: object

Statistics for a WebSocket connection.

bytes_received: int = 0
bytes_sent: int = 0
connected_at: float = 1771284566.6868134
created_at: datetime
errors: int = 0
is_secure: bool = False
last_activity: datetime | None = None
last_error: str | None = None
messages_received: int = 0
messages_sent: int = 0
ping_rtt_ms: float = 0.0
protocol: str | None = None
record_error(error: str) None

Record an error occurrence.

record_message_received(size: int) None

Record a message being received.

record_message_sent(size: int) None

Record a message being sent.

update_activity() None

Update last activity timestamp.

libp2p.transport.websocket.listener module

class libp2p.transport.websocket.listener.WebsocketListener(handler: Callable[[Any], Awaitable[None]], upgrader: TransportUpgrader, config: WebsocketListenerConfig | None = None, peer_id: ID | None = None)

Bases: IListener

Production-ready WebSocket listener with advanced features:

  • WS and WSS protocol support

  • Connection limits and tracking

  • Flow control and buffer management

  • Proper error handling and cleanup

  • TLS configuration

  • Configurable timeouts and limits

async close() None

Close the listener and all connections.

Raises:

WebSocketConnectionError – If closing connections fails

get_addrs() tuple[Multiaddr, ...]

Get listening addresses.

get_stats() dict[str, int]

Get listener statistics.

property is_closed: bool

Check if the listener is closed.

async listen(maddr: Multiaddr, nursery: Nursery) bool

Start listening for connections.

Args:

maddr: Multiaddr to listen on nursery: Trio nursery for managing tasks

Returns:

bool: True if listening started successfully

Raises:
  • OpenConnectionError – If listening fails, listener is closed, invalid WebSocket multiaddr, or connection limit reached

  • ValueError – If WSS (secure WebSocket) requires TLS configuration but none was provided

property listen_maddr: Multiaddr | None

Get the listening multiaddr.

class libp2p.transport.websocket.listener.WebsocketListenerConfig(tls_config: SSLContext | None = None, autotls_config: AutoTLSConfig | None = None, advanced_tls_config: WebSocketTLSConfig | None = None, max_connections: int = 1000, max_message_size: int = 33554432, handshake_timeout: float = 15.0, ping_interval: float = 20.0, ping_timeout: float = 10.0, close_timeout: float = 5.0)

Bases: object

Configuration for WebSocket listener.

advanced_tls_config: WebSocketTLSConfig | None = None
autotls_config: AutoTLSConfig | None = None
close_timeout: float = 5.0
handshake_timeout: float = 15.0
max_connections: int = 1000
max_message_size: int = 33554432
ping_interval: float = 20.0
ping_timeout: float = 10.0
tls_config: SSLContext | None = None

libp2p.transport.websocket.manager module

WebSocket connection manager for handling multiple connections.

class libp2p.transport.websocket.manager.WebSocketConnectionManager(max_connections: int = 1000, inactive_timeout: float = 300.0, cleanup_interval: float = 60.0)

Bases: object

Manages multiple WebSocket connections with features: - Connection pooling and cleanup - Statistics tracking - Resource limits - Automatic cleanup of inactive connections

async add_connection(conn_id: str, connection: P2PWebSocketConnection) None

Add a new connection to the manager.

Args:

conn_id: Unique connection identifier connection: WebSocket connection instance

Raises:

RuntimeError: If maximum connections reached

async close_all() None

Close all connections and stop the manager.

get_active_connections() set[str]

Get IDs of all active (non-closed) connections.

Returns:

Set[str]: Set of active connection IDs

async get_connection(conn_id: str) P2PWebSocketConnection | None

Get a connection by ID.

Args:

conn_id: Connection identifier

Returns:

Optional[P2PWebSocketConnection]: Connection if found, None otherwise

get_connection_stats() dict[str, dict[str, Any]]

Get statistics for all connections.

Returns:

Dict[str, Dict]: Connection statistics by connection ID

get_manager_stats() dict[str, Any]

Get overall connection manager statistics.

Returns:

Dict: Manager statistics

async remove_connection(conn_id: str) None

Remove a connection from the manager.

Args:

conn_id: Connection identifier to remove

libp2p.transport.websocket.multiaddr_utils module

WebSocket multiaddr parsing utilities.

class libp2p.transport.websocket.multiaddr_utils.ParsedWebSocketMultiaddr(is_wss: bool, sni: str | None, rest_multiaddr: Multiaddr)

Bases: NamedTuple

Parsed WebSocket multiaddr information.

is_wss: bool

Alias for field number 0

rest_multiaddr: Multiaddr

Alias for field number 2

sni: str | None

Alias for field number 1

libp2p.transport.websocket.multiaddr_utils.is_valid_websocket_multiaddr(maddr: Multiaddr) bool

Validate that a multiaddr has a valid WebSocket structure.

Parameters:

maddr – The multiaddr to validate

Returns:

True if valid WebSocket structure, False otherwise

libp2p.transport.websocket.multiaddr_utils.parse_websocket_multiaddr(maddr: Multiaddr) ParsedWebSocketMultiaddr

Parse a WebSocket multiaddr and extract security information.

Parameters:

maddr – The multiaddr to parse

Returns:

Parsed WebSocket multiaddr information

Raises:

ValueError – If the multiaddr is not a valid WebSocket multiaddr

libp2p.transport.websocket.proxy module

SOCKS proxy connection manager for WebSocket transport. Supports SOCKS4, SOCKS4a, and SOCKS5 protocols with async/await.

class libp2p.transport.websocket.proxy.SOCKSConnectionManager(proxy_url: str, auth: tuple[str, str] | None = None, timeout: float = 10.0)

Bases: object

SOCKS proxy connection manager for WebSocket transport.

Supports SOCKS4, SOCKS4a, and SOCKS5 protocols with trio async/await. This implementation is fully compatible with trio’s event loop.

Example:
>>> from libp2p.transport.websocket.proxy import SOCKSConnectionManager
>>> import trio
>>> manager = SOCKSConnectionManager('socks5://localhost:1080')
>>> # Note: This is an async example, so it can't be run in doctest
>>> # async with trio.open_nursery() as nursery:
>>> #     ws = await manager.create_connection(nursery, 'example.com', 443)
async create_connection(nursery: Nursery, host: str, port: int, ssl_context: SSLContext | None = None) Any

Create a WebSocket connection through SOCKS proxy.

This method: 1. Establishes SOCKS tunnel to target host 2. Creates WebSocket connection over the tunnel 3. Returns trio-websocket connection object

Args:

nursery: Trio nursery for managing connection lifecycle host: Target WebSocket host port: Target WebSocket port ssl_context: Optional SSL context for WSS connections

Returns:

WebSocket connection object (trio-websocket)

Raises:

ConnectionError: If SOCKS connection or WebSocket upgrade fails trio.TooSlowError: If connection times out

get_proxy_info() dict[str, Any]

Get proxy configuration information.

Returns:

Dictionary with proxy configuration details

libp2p.transport.websocket.proxy_env module

Environment variable proxy configuration support. Mimics Go’s http.ProxyFromEnvironment functionality.

libp2p.transport.websocket.proxy_env.get_proxy_from_environment(url: str) str | None

Get proxy URL from environment variables with uppercase precedence.

Mimics Go’s http.ProxyFromEnvironment behavior: - Uses HTTP_PROXY for ws:// URLs - Uses HTTPS_PROXY for wss:// URLs - Checks both lowercase and uppercase variants (uppercase takes precedence) - Returns None if NO_PROXY matches the target

Platform-specific behavior: - On Linux/Unix: HTTP_PROXY and http_proxy are distinct variables, uppercase is checked first - On Windows: Environment variables are case-insensitive, so HTTP_PROXY and http_proxy are treated as the same variable; whichever was set last takes effect

Args:

url: The WebSocket URL being dialed (ws:// or wss://)

Returns:

Proxy URL string or None if no proxy configured

libp2p.transport.websocket.proxy_env.validate_proxy_url(proxy_url: str) bool

Validate that a proxy URL has a supported scheme.

Args:

proxy_url: Proxy URL to validate

Returns:

True if valid and supported, False otherwise

libp2p.transport.websocket.transport module

class libp2p.transport.websocket.transport.WebsocketConfig(tls_client_config: SSLContext | None = None, tls_server_config: SSLContext | None = None, tls_config: WebSocketTLSConfig | None = None, autotls_config: AutoTLSConfig | None = None, handshake_timeout: float = 15.0, max_buffered_amount: int = 4194304, max_connections: int = 1000, proxy_url: str | None = None, proxy_auth: tuple[str, str] | None = None, ping_interval: float = 20.0, ping_timeout: float = 10.0, close_timeout: float = 5.0, max_message_size: int = 33554432)

Bases: object

Configuration options for WebSocket transport.

autotls_config: AutoTLSConfig | None = None
close_timeout: float = 5.0
handshake_timeout: float = 15.0
max_buffered_amount: int = 4194304
max_connections: int = 1000
max_message_size: int = 33554432
ping_interval: float = 20.0
ping_timeout: float = 10.0
proxy_auth: tuple[str, str] | None = None
proxy_url: str | None = None
tls_client_config: SSLContext | None = None
tls_config: WebSocketTLSConfig | None = None
tls_server_config: SSLContext | None = None
validate() None

Validate configuration settings.

class libp2p.transport.websocket.transport.WebsocketTransport(upgrader: TransportUpgrader, config: WebsocketConfig | None = None, tls_client_config: SSLContext | None = None, tls_server_config: SSLContext | None = None, handshake_timeout: float | None = None)

Bases: ITransport

Libp2p WebSocket transport implementation with production features:

Features: - WS and WSS protocol support with configurable TLS - Connection management with limits and tracking - Flow control and buffer management - SOCKS5 proxy support - Proper error handling and connection cleanup - Configurable timeouts and limits - Connection state monitoring - Concurrent connection handling

async can_dial(maddr: Multiaddr) bool

Check if we can dial the given multiaddr.

create_listener(handler: Callable[[ReadWriteCloser], Awaitable[None]]) IListener

Create a WebSocket listener with the given handler.

Args:

handler: Connection handler function

Returns:

A WebSocket listener

Raises:

ValueError – If configuration validation fails

async dial(maddr: Multiaddr) RawConnection

Dial a WebSocket connection to the given multiaddr.

Args:

maddr: The multiaddr to dial (e.g., /ip4/127.0.0.1/tcp/8000/ws)

Returns:

An upgraded RawConnection

Raises:
  • OpenConnectionError – If connection fails, cannot dial the multiaddr, connection upgrade fails, or maximum connections reached

  • ValueError – If multiaddr is invalid or cannot be parsed

async get_connections() dict[str, P2PWebSocketConnection]

Get all active connections.

get_listeners() set[WebsocketListener]

Get all active listeners.

get_stats() dict[str, int]

Get transport statistics.

resolve(maddr: Multiaddr) list[Multiaddr]

Resolve a WebSocket multiaddr to its concrete addresses. Currently, just validates and returns the input multiaddr.

Args:

maddr: The multiaddr to resolve

Returns:

List containing the original multiaddr

set_background_nursery(nursery: Nursery) None

Set the nursery to use for background tasks (called by Swarm).

set_peer_id(peer_id: ID) None

Set the peer ID of the host.

libp2p.transport.websocket.transport.WithAdvancedTLS(cert_file: str | None = None, key_file: str | None = None, ca_file: str | None = None, verify_peer: bool = True, verify_hostname: bool = True) WebsocketConfig

Create a WebsocketConfig with advanced TLS settings.

Args:

cert_file: Certificate file path key_file: Private key file path ca_file: CA certificate file path verify_peer: Verify peer certificates verify_hostname: Verify hostname

Returns:

WebsocketConfig with advanced TLS settings

Example:
>>> from libp2p.transport.websocket import WithAdvancedTLS
>>> config = WithAdvancedTLS(
...     cert_file="server.crt",
...     key_file="server.key",
...     ca_file="ca.crt"
... )
>>> # Note: Creating transport would require actual certificate files
>>> # transport = WebsocketTransport(upgrader, config=config)
libp2p.transport.websocket.transport.WithAutoTLS(domain: str = 'libp2p.local', storage_path: str = 'autotls-certs', renewal_threshold_hours: int = 24, cert_validity_days: int = 90) WebsocketConfig

Create a WebsocketConfig with AutoTLS enabled.

Args:

domain: Default domain for certificates storage_path: Path for certificate storage renewal_threshold_hours: Hours before expiry to renew certificate cert_validity_days: Certificate validity period in days

Returns:

WebsocketConfig with AutoTLS enabled

Example:
>>> from libp2p.transport.websocket import WithAutoTLS, WebsocketTransport
>>> from libp2p.transport.upgrader import TransportUpgrader
>>> upgrader = TransportUpgrader({}, {})
>>> config = WithAutoTLS(domain="myapp.local")
>>> transport = WebsocketTransport(upgrader, config=config)
libp2p.transport.websocket.transport.WithHandshakeTimeout(timeout: float) WebsocketConfig

Create a WebsocketConfig with custom handshake timeout.

Args:

timeout: Handshake timeout in seconds

Returns:

WebsocketConfig with timeout configured

Example:
>>> from libp2p.transport.websocket import (
...     WithHandshakeTimeout, WebsocketTransport
... )
>>> from libp2p.transport.upgrader import TransportUpgrader
>>> upgrader = TransportUpgrader({}, {})
>>> config = WithHandshakeTimeout(30.0)
>>> transport = WebsocketTransport(upgrader, config=config)
libp2p.transport.websocket.transport.WithMaxConnections(max_connections: int) WebsocketConfig

Create a WebsocketConfig with custom connection limit.

Args:

max_connections: Maximum number of concurrent connections

Returns:

WebsocketConfig with connection limit configured

Example:
>>> from libp2p.transport.websocket import (
...     WithMaxConnections, WebsocketTransport
... )
>>> from libp2p.transport.upgrader import TransportUpgrader
>>> upgrader = TransportUpgrader({}, {})
>>> config = WithMaxConnections(500)
>>> transport = WebsocketTransport(upgrader, config=config)
libp2p.transport.websocket.transport.WithProxy(proxy_url: str, auth: tuple[str, str] | None = None) WebsocketConfig

Create a WebsocketConfig with SOCKS proxy settings.

Convenience method similar to go-libp2p’s WithTLSClientConfig.

Args:

proxy_url: SOCKS proxy URL (e.g., ‘socks5://localhost:1080’) auth: Optional (username, password) tuple for proxy authentication

Returns:

WebsocketConfig with proxy settings configured

Example:
>>> from libp2p.transport.websocket import WithProxy, WebsocketTransport
>>> from libp2p.transport.upgrader import TransportUpgrader
>>> upgrader = TransportUpgrader({}, {})
>>> config = WithProxy('socks5://proxy.corp.com:1080', ('user', 'pass'))
>>> transport = WebsocketTransport(upgrader, config=config)
libp2p.transport.websocket.transport.WithProxyFromEnvironment() WebsocketConfig

Create a WebsocketConfig that will use proxy from environment variables.

This is the default behavior, but this method makes it explicit. Reads HTTP_PROXY for ws:// and HTTPS_PROXY for wss:// connections.

Returns:

WebsocketConfig with no explicit proxy (will use environment)

Example:
>>> import os
>>> from libp2p.transport.websocket import (
...     WithProxyFromEnvironment, WebsocketTransport
... )
>>> from libp2p.transport.upgrader import TransportUpgrader
>>> upgrader = TransportUpgrader({}, {})
>>> os.environ['HTTPS_PROXY'] = 'socks5://localhost:1080'
>>> config = WithProxyFromEnvironment()
>>> transport = WebsocketTransport(upgrader, config=config)
libp2p.transport.websocket.transport.WithTLSClientConfig(tls_config: SSLContext) WebsocketConfig

Create a WebsocketConfig with custom TLS client configuration.

Args:

tls_config: SSL context for client TLS configuration

Returns:

WebsocketConfig with TLS settings configured

Example:
>>> import ssl
>>> from libp2p.transport.websocket import (
...     WithTLSClientConfig, WebsocketTransport
... )
>>> from libp2p.transport.upgrader import TransportUpgrader
>>> upgrader = TransportUpgrader({}, {})
>>> ctx = ssl.create_default_context()
>>> ctx.check_hostname = False
>>> config = WithTLSClientConfig(ctx)
>>> transport = WebsocketTransport(upgrader, config=config)
libp2p.transport.websocket.transport.WithTLSServerConfig(tls_config: SSLContext) WebsocketConfig

Create a WebsocketConfig with custom TLS server configuration.

Args:

tls_config: SSL context for server TLS configuration

Returns:

WebsocketConfig with server TLS settings configured

Example:
>>> import ssl
>>> from libp2p.transport.websocket import WithTLSServerConfig
>>> ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
>>> # Note: This would fail in doctest due to missing files
>>> # ctx.load_cert_chain('server.crt', 'server.key')
>>> config = WithTLSServerConfig(ctx)
libp2p.transport.websocket.transport.combine_configs(*configs: WebsocketConfig) WebsocketConfig

Combine multiple WebsocketConfig objects.

Later configs override earlier configs for non-None values.

Args:

configs: Variable number of WebsocketConfig objects

Returns:

Combined WebsocketConfig

Example:
>>> from libp2p.transport.websocket import (
...     WithProxy, WithTLSClientConfig, WithHandshakeTimeout,
...     combine_configs, WebsocketTransport
... )
>>> from libp2p.transport.upgrader import TransportUpgrader
>>> import ssl
>>> my_ssl_context = ssl.create_default_context()
>>> upgrader = TransportUpgrader({}, {})
>>> proxy_config = WithProxy('socks5://localhost:1080')
>>> tls_config = WithTLSClientConfig(my_ssl_context)
>>> timeout_config = WithHandshakeTimeout(30.0)
>>> final = combine_configs(proxy_config, tls_config, timeout_config)
>>> transport = WebsocketTransport(upgrader, config=final)

Module contents

WebSocket transport for py-libp2p.

class libp2p.transport.websocket.AutoTLSConfig(enabled: bool = True, storage_path: str | Path = 'autotls-certs', renewal_threshold_hours: int = 24, cert_validity_days: int = 90, default_domain: str = 'libp2p.local', wildcard_domain: bool = True)

Bases: object

Configuration for AutoTLS functionality.

validate() None

Validate configuration.

class libp2p.transport.websocket.P2PWebSocketConnection(ws_connection: WebSocketConnection | Any, local_addr: Multiaddr | None = None, remote_addr: Multiaddr | None = None, ssl_context: SSLContext | None = None, max_message_size: int = 1048576, keepalive_interval: float = 30.0, handshake_timeout: float = 10.0, max_buffer: int = 4194304, is_secure: bool = False, max_buffered_amount: int = 8388608)

Bases: ReadWriteCloser

Production-ready WebSocket connection implementation with enhanced features: - Comprehensive error handling and custom exceptions - Connection statistics and activity tracking - Secure connection support (WSS) - Message validation and size limits - Automatic ping/pong handling - Browser compatibility optimizations - Activity monitoring - Secure connection support

  • Flow control and buffer management

  • Error handling and recovery

  • Connection state monitoring

  • Performance statistics

  • Ping/pong keepalive

  • Graceful shutdown

async close() None

Close the WebSocket connection. This method is idempotent.

Raises:

WebSocketConnectionError – If closing the connection fails

conn_state() dict[str, Any]

Return connection state information similar to Go’s ConnState() method.

Returns:

Dictionary containing connection state information

get_remote_address() tuple[str, int] | None

Get remote address from the WebSocket connection.

get_stats() dict[str, int | float]

Get connection statistics.

is_closed() bool

Check if the connection is closed

async read(n: int | None = None) bytes

Read data from the WebSocket connection.

This method blocks until data is available, similar to TCP streams. This is required for multistream negotiation which expects blocking reads.

When the WebSocket connection is closed by the peer, this method raises IOException with detailed information including the close code and reason, rather than returning empty bytes. This allows higher-level code (such as read_exactly()) to immediately detect connection closure and provide better error messages.

Args:

n: Number of bytes to read (None for all available)

Returns:

bytes: Received data

Raises:
  • IOException – If connection is closed or read fails. When the connection is closed by the peer, the exception includes the WebSocket close code and reason for better debugging.

  • WebSocketConnectionError – If WebSocket connection error occurs

  • WebSocketMessageError – If message processing fails

async write(data: bytes) None

Write data to the WebSocket connection.

Args:

data: The bytes to write

Raises:
libp2p.transport.websocket.WebSocketTransport

alias of WebsocketTransport

class libp2p.transport.websocket.WebsocketConfig(tls_client_config: SSLContext | None = None, tls_server_config: SSLContext | None = None, tls_config: WebSocketTLSConfig | None = None, autotls_config: AutoTLSConfig | None = None, handshake_timeout: float = 15.0, max_buffered_amount: int = 4194304, max_connections: int = 1000, proxy_url: str | None = None, proxy_auth: tuple[str, str] | None = None, ping_interval: float = 20.0, ping_timeout: float = 10.0, close_timeout: float = 5.0, max_message_size: int = 33554432)

Bases: object

Configuration options for WebSocket transport.

autotls_config: AutoTLSConfig | None = None
close_timeout: float = 5.0
handshake_timeout: float = 15.0
max_buffered_amount: int = 4194304
max_connections: int = 1000
max_message_size: int = 33554432
ping_interval: float = 20.0
ping_timeout: float = 10.0
proxy_auth: tuple[str, str] | None = None
proxy_url: str | None = None
tls_client_config: SSLContext | None = None
tls_config: WebSocketTLSConfig | None = None
tls_server_config: SSLContext | None = None
validate() None

Validate configuration settings.

class libp2p.transport.websocket.WebsocketListener(handler: Callable[[Any], Awaitable[None]], upgrader: TransportUpgrader, config: WebsocketListenerConfig | None = None, peer_id: ID | None = None)

Bases: IListener

Production-ready WebSocket listener with advanced features:

  • WS and WSS protocol support

  • Connection limits and tracking

  • Flow control and buffer management

  • Proper error handling and cleanup

  • TLS configuration

  • Configurable timeouts and limits

async close() None

Close the listener and all connections.

Raises:

WebSocketConnectionError – If closing connections fails

get_addrs() tuple[Multiaddr, ...]

Get listening addresses.

get_stats() dict[str, int]

Get listener statistics.

property is_closed: bool

Check if the listener is closed.

async listen(maddr: Multiaddr, nursery: Nursery) bool

Start listening for connections.

Args:

maddr: Multiaddr to listen on nursery: Trio nursery for managing tasks

Returns:

bool: True if listening started successfully

Raises:
  • OpenConnectionError – If listening fails, listener is closed, invalid WebSocket multiaddr, or connection limit reached

  • ValueError – If WSS (secure WebSocket) requires TLS configuration but none was provided

property listen_maddr: Multiaddr | None

Get the listening multiaddr.

class libp2p.transport.websocket.WebsocketListenerConfig(tls_config: SSLContext | None = None, autotls_config: AutoTLSConfig | None = None, advanced_tls_config: WebSocketTLSConfig | None = None, max_connections: int = 1000, max_message_size: int = 33554432, handshake_timeout: float = 15.0, ping_interval: float = 20.0, ping_timeout: float = 10.0, close_timeout: float = 5.0)

Bases: object

Configuration for WebSocket listener.

advanced_tls_config: WebSocketTLSConfig | None = None
autotls_config: AutoTLSConfig | None = None
close_timeout: float = 5.0
handshake_timeout: float = 15.0
max_connections: int = 1000
max_message_size: int = 33554432
ping_interval: float = 20.0
ping_timeout: float = 10.0
tls_config: SSLContext | None = None
class libp2p.transport.websocket.WebsocketTransport(upgrader: TransportUpgrader, config: WebsocketConfig | None = None, tls_client_config: SSLContext | None = None, tls_server_config: SSLContext | None = None, handshake_timeout: float | None = None)

Bases: ITransport

Libp2p WebSocket transport implementation with production features:

Features: - WS and WSS protocol support with configurable TLS - Connection management with limits and tracking - Flow control and buffer management - SOCKS5 proxy support - Proper error handling and connection cleanup - Configurable timeouts and limits - Connection state monitoring - Concurrent connection handling

async can_dial(maddr: Multiaddr) bool

Check if we can dial the given multiaddr.

create_listener(handler: Callable[[ReadWriteCloser], Awaitable[None]]) IListener

Create a WebSocket listener with the given handler.

Args:

handler: Connection handler function

Returns:

A WebSocket listener

Raises:

ValueError – If configuration validation fails

async dial(maddr: Multiaddr) RawConnection

Dial a WebSocket connection to the given multiaddr.

Args:

maddr: The multiaddr to dial (e.g., /ip4/127.0.0.1/tcp/8000/ws)

Returns:

An upgraded RawConnection

Raises:
  • OpenConnectionError – If connection fails, cannot dial the multiaddr, connection upgrade fails, or maximum connections reached

  • ValueError – If multiaddr is invalid or cannot be parsed

async get_connections() dict[str, P2PWebSocketConnection]

Get all active connections.

get_listeners() set[WebsocketListener]

Get all active listeners.

get_stats() dict[str, int]

Get transport statistics.

resolve(maddr: Multiaddr) list[Multiaddr]

Resolve a WebSocket multiaddr to its concrete addresses. Currently, just validates and returns the input multiaddr.

Args:

maddr: The multiaddr to resolve

Returns:

List containing the original multiaddr

set_background_nursery(nursery: Nursery) None

Set the nursery to use for background tasks (called by Swarm).

set_peer_id(peer_id: ID) None

Set the peer ID of the host.

libp2p.transport.websocket.WithAdvancedTLS(cert_file: str | None = None, key_file: str | None = None, ca_file: str | None = None, verify_peer: bool = True, verify_hostname: bool = True) WebsocketConfig

Create a WebsocketConfig with advanced TLS settings.

Args:

cert_file: Certificate file path key_file: Private key file path ca_file: CA certificate file path verify_peer: Verify peer certificates verify_hostname: Verify hostname

Returns:

WebsocketConfig with advanced TLS settings

Example:
>>> from libp2p.transport.websocket import WithAdvancedTLS
>>> config = WithAdvancedTLS(
...     cert_file="server.crt",
...     key_file="server.key",
...     ca_file="ca.crt"
... )
>>> # Note: Creating transport would require actual certificate files
>>> # transport = WebsocketTransport(upgrader, config=config)
libp2p.transport.websocket.WithAutoTLS(domain: str = 'libp2p.local', storage_path: str = 'autotls-certs', renewal_threshold_hours: int = 24, cert_validity_days: int = 90) WebsocketConfig

Create a WebsocketConfig with AutoTLS enabled.

Args:

domain: Default domain for certificates storage_path: Path for certificate storage renewal_threshold_hours: Hours before expiry to renew certificate cert_validity_days: Certificate validity period in days

Returns:

WebsocketConfig with AutoTLS enabled

Example:
>>> from libp2p.transport.websocket import WithAutoTLS, WebsocketTransport
>>> from libp2p.transport.upgrader import TransportUpgrader
>>> upgrader = TransportUpgrader({}, {})
>>> config = WithAutoTLS(domain="myapp.local")
>>> transport = WebsocketTransport(upgrader, config=config)
libp2p.transport.websocket.WithHandshakeTimeout(timeout: float) WebsocketConfig

Create a WebsocketConfig with custom handshake timeout.

Args:

timeout: Handshake timeout in seconds

Returns:

WebsocketConfig with timeout configured

Example:
>>> from libp2p.transport.websocket import (
...     WithHandshakeTimeout, WebsocketTransport
... )
>>> from libp2p.transport.upgrader import TransportUpgrader
>>> upgrader = TransportUpgrader({}, {})
>>> config = WithHandshakeTimeout(30.0)
>>> transport = WebsocketTransport(upgrader, config=config)
libp2p.transport.websocket.WithMaxConnections(max_connections: int) WebsocketConfig

Create a WebsocketConfig with custom connection limit.

Args:

max_connections: Maximum number of concurrent connections

Returns:

WebsocketConfig with connection limit configured

Example:
>>> from libp2p.transport.websocket import (
...     WithMaxConnections, WebsocketTransport
... )
>>> from libp2p.transport.upgrader import TransportUpgrader
>>> upgrader = TransportUpgrader({}, {})
>>> config = WithMaxConnections(500)
>>> transport = WebsocketTransport(upgrader, config=config)
libp2p.transport.websocket.WithProxy(proxy_url: str, auth: tuple[str, str] | None = None) WebsocketConfig

Create a WebsocketConfig with SOCKS proxy settings.

Convenience method similar to go-libp2p’s WithTLSClientConfig.

Args:

proxy_url: SOCKS proxy URL (e.g., ‘socks5://localhost:1080’) auth: Optional (username, password) tuple for proxy authentication

Returns:

WebsocketConfig with proxy settings configured

Example:
>>> from libp2p.transport.websocket import WithProxy, WebsocketTransport
>>> from libp2p.transport.upgrader import TransportUpgrader
>>> upgrader = TransportUpgrader({}, {})
>>> config = WithProxy('socks5://proxy.corp.com:1080', ('user', 'pass'))
>>> transport = WebsocketTransport(upgrader, config=config)
libp2p.transport.websocket.WithProxyFromEnvironment() WebsocketConfig

Create a WebsocketConfig that will use proxy from environment variables.

This is the default behavior, but this method makes it explicit. Reads HTTP_PROXY for ws:// and HTTPS_PROXY for wss:// connections.

Returns:

WebsocketConfig with no explicit proxy (will use environment)

Example:
>>> import os
>>> from libp2p.transport.websocket import (
...     WithProxyFromEnvironment, WebsocketTransport
... )
>>> from libp2p.transport.upgrader import TransportUpgrader
>>> upgrader = TransportUpgrader({}, {})
>>> os.environ['HTTPS_PROXY'] = 'socks5://localhost:1080'
>>> config = WithProxyFromEnvironment()
>>> transport = WebsocketTransport(upgrader, config=config)
libp2p.transport.websocket.WithTLSClientConfig(tls_config: SSLContext) WebsocketConfig

Create a WebsocketConfig with custom TLS client configuration.

Args:

tls_config: SSL context for client TLS configuration

Returns:

WebsocketConfig with TLS settings configured

Example:
>>> import ssl
>>> from libp2p.transport.websocket import (
...     WithTLSClientConfig, WebsocketTransport
... )
>>> from libp2p.transport.upgrader import TransportUpgrader
>>> upgrader = TransportUpgrader({}, {})
>>> ctx = ssl.create_default_context()
>>> ctx.check_hostname = False
>>> config = WithTLSClientConfig(ctx)
>>> transport = WebsocketTransport(upgrader, config=config)
libp2p.transport.websocket.WithTLSServerConfig(tls_config: SSLContext) WebsocketConfig

Create a WebsocketConfig with custom TLS server configuration.

Args:

tls_config: SSL context for server TLS configuration

Returns:

WebsocketConfig with server TLS settings configured

Example:
>>> import ssl
>>> from libp2p.transport.websocket import WithTLSServerConfig
>>> ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
>>> # Note: This would fail in doctest due to missing files
>>> # ctx.load_cert_chain('server.crt', 'server.key')
>>> config = WithTLSServerConfig(ctx)
libp2p.transport.websocket.combine_configs(*configs: WebsocketConfig) WebsocketConfig

Combine multiple WebsocketConfig objects.

Later configs override earlier configs for non-None values.

Args:

configs: Variable number of WebsocketConfig objects

Returns:

Combined WebsocketConfig

Example:
>>> from libp2p.transport.websocket import (
...     WithProxy, WithTLSClientConfig, WithHandshakeTimeout,
...     combine_configs, WebsocketTransport
... )
>>> from libp2p.transport.upgrader import TransportUpgrader
>>> import ssl
>>> my_ssl_context = ssl.create_default_context()
>>> upgrader = TransportUpgrader({}, {})
>>> proxy_config = WithProxy('socks5://localhost:1080')
>>> tls_config = WithTLSClientConfig(my_ssl_context)
>>> timeout_config = WithHandshakeTimeout(30.0)
>>> final = combine_configs(proxy_config, tls_config, timeout_config)
>>> transport = WebsocketTransport(upgrader, config=final)