WebSocket Transport Examples

This guide demonstrates how to use the WebSocket transport in py-libp2p, including WS (WebSocket) and WSS (WebSocket Secure) protocols, SOCKS proxy support, AutoTLS, and advanced configuration options.

Quick Start

Basic WebSocket transport setup:

from libp2p import new_host
from libp2p.transport.websocket import WebsocketTransport, WebsocketConfig
from libp2p.transport.upgrader import TransportUpgrader
from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID, InsecureTransport
from libp2p.stream_muxer.yamux.yamux import Yamux
from libp2p.custom_types import TProtocol
from libp2p.crypto.rsa import create_new_key_pair
from multiaddr import Multiaddr

# Generate a key pair
key_pair = create_new_key_pair()

# Create upgrader with security and muxer
upgrader = TransportUpgrader(
    secure_transports_by_protocol={
        TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair)
    },
    muxer_transports_by_protocol={TProtocol("/yamux/1.0.0"): Yamux}
)

# Create WebSocket transport
config = WebsocketConfig()
transport = WebsocketTransport(upgrader, config=config)

# Create host with WebSocket transport
# Note: new_host handles transport creation internally if not overridden,
# but this shows how to set it up manually if needed.
host = new_host(
    listen_addrs=[Multiaddr("/ip4/127.0.0.1/tcp/8080/ws")]
)

WS vs WSS

WS (Insecure WebSocket):

Use for development and testing. No TLS encryption:

ws_addr = Multiaddr("/ip4/127.0.0.1/tcp/8080/ws")
host = new_host(listen_addrs=[ws_addr])

WSS (Secure WebSocket):

Use for production. Requires TLS configuration:

import ssl

# Create TLS context
tls_context = ssl.create_default_context()
tls_context.check_hostname = False
tls_context.verify_mode = ssl.CERT_NONE  # For testing only

# Configure transport with TLS
config = WebsocketConfig(tls_server_config=tls_context)
transport = WebsocketTransport(upgrader, config=config)

wss_addr = Multiaddr("/ip4/127.0.0.1/tcp/8080/wss")
host = new_host(listen_addrs=[wss_addr])

SOCKS Proxy Configuration

Using Proxy Factory Function:

from libp2p.transport.websocket import WithProxy

# Configure with SOCKS5 proxy
config = WithProxy(
    proxy_url="socks5://proxy.example.com:1080",
    auth=("username", "password")  # Optional
)
transport = WebsocketTransport(upgrader, config=config)

Using Environment Variables:

from libp2p.transport.websocket import WithProxyFromEnvironment

# Reads HTTP_PROXY or HTTPS_PROXY environment variables
config = WithProxyFromEnvironment()
transport = WebsocketTransport(upgrader, config=config)

Environment Variable Format:

export HTTP_PROXY=socks5://proxy.example.com:1080
export HTTPS_PROXY=socks5://proxy.example.com:1080

AutoTLS Configuration

AutoTLS automatically generates and manages TLS certificates for browser integration:

from libp2p.transport.websocket import WithAutoTLS, AutoTLSConfig

autotls_config = AutoTLSConfig(
    enabled=True,
    default_domain="localhost",
    cert_validity_days=365
)

config = WithAutoTLS(autotls_config)
transport = WebsocketTransport(upgrader, config=config)

Browser Integration:

AutoTLS is particularly useful for browser-based applications where you need trusted certificates for WebSocket connections.

Advanced TLS Configuration

For production deployments with custom certificates:

from libp2p.transport.websocket import WithAdvancedTLS
from libp2p.transport.websocket.tls_config import (
    TLSConfig,
    CertificateValidationMode
)

tls_config = TLSConfig(
    cert_file="path/to/cert.pem",
    key_file="path/to/key.pem",
    ca_file="path/to/ca.pem",
    validation_mode=CertificateValidationMode.STRICT
)

config = WithAdvancedTLS(tls_config)
transport = WebsocketTransport(upgrader, config=config)

Connection Management

Configure connection limits and timeouts:

from libp2p.transport.websocket import (
    WithMaxConnections,
    WithHandshakeTimeout
)

config = WebsocketConfig(
    max_connections=1000,
    handshake_timeout=15.0,
    max_buffered_amount=4 * 1024 * 1024,  # 4MB
    max_message_size=32 * 1024 * 1024     # 32MB
)

transport = WebsocketTransport(upgrader, config=config)

Monitoring Connection Statistics:

# Get transport statistics
stats = transport.get_stats()
print(f"Total connections: {stats['total_connections']}")
print(f"Current connections: {stats['current_connections']}")
print(f"Failed connections: {stats['failed_connections']}")

# Get connection details
connections = await transport.get_connections()
for conn_id, conn in connections.items():
    conn_stats = conn.get_stats()
    print(f"Connection {conn_id}: {conn_stats}")

Complete Example

A complete example demonstrating WebSocket transport with all features:

  1import argparse
  2import logging
  3import signal
  4import sys
  5import traceback
  6
  7import multiaddr
  8import trio
  9
 10from libp2p.abc import INotifee
 11from libp2p.crypto.secp256k1 import create_new_key_pair
 12from libp2p.crypto.x25519 import create_new_key_pair as create_new_x25519_key_pair
 13from libp2p.custom_types import TProtocol
 14from libp2p.host.basic_host import BasicHost
 15from libp2p.network.swarm import Swarm
 16from libp2p.peer.id import ID
 17from libp2p.peer.peerinfo import info_from_p2p_addr
 18from libp2p.peer.peerstore import PeerStore
 19from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID, InsecureTransport
 20from libp2p.security.noise.transport import (
 21    PROTOCOL_ID as NOISE_PROTOCOL_ID,
 22    Transport as NoiseTransport,
 23)
 24from libp2p.stream_muxer.yamux.yamux import Yamux
 25from libp2p.transport.upgrader import TransportUpgrader
 26from libp2p.transport.websocket.transport import WebsocketTransport
 27
 28# Enable debug logging
 29logging.basicConfig(level=logging.DEBUG)
 30
 31logger = logging.getLogger("libp2p.websocket-example")
 32
 33
 34# Suppress KeyboardInterrupt by handling SIGINT directly
 35def signal_handler(signum, frame):
 36    print("✅ Clean exit completed.")
 37    sys.exit(0)
 38
 39
 40signal.signal(signal.SIGINT, signal_handler)
 41
 42# Simple echo protocol
 43ECHO_PROTOCOL_ID = TProtocol("/echo/1.0.0")
 44
 45
 46async def echo_handler(stream):
 47    """Simple echo handler that echoes back any data received."""
 48    try:
 49        data = await stream.read(1024)
 50        if data:
 51            message = data.decode("utf-8", errors="replace")
 52            print(f"📥 Received: {message}")
 53            print(f"📤 Echoing back: {message}")
 54            await stream.write(data)
 55        await stream.close()
 56    except Exception as e:
 57        logger.error(f"Echo handler error: {e}")
 58        await stream.close()
 59
 60
 61def create_websocket_host(listen_addrs=None, use_plaintext=False):
 62    """Create a host with WebSocket transport."""
 63    # Create key pair and peer store
 64    key_pair = create_new_key_pair()
 65    peer_id = ID.from_pubkey(key_pair.public_key)
 66    peer_store = PeerStore()
 67    peer_store.add_key_pair(peer_id, key_pair)
 68
 69    if use_plaintext:
 70        # Create transport upgrader with plaintext security
 71        upgrader = TransportUpgrader(
 72            secure_transports_by_protocol={
 73                TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair)
 74            },
 75            muxer_transports_by_protocol={TProtocol("/yamux/1.0.0"): Yamux},
 76        )
 77    else:
 78        # Separate X25519 keypair for Noise static DH (libp2p Noise spec); identity
 79        # uses secp256k1 key_pair above.
 80        noise_key_pair = create_new_x25519_key_pair()
 81
 82        # Create Noise transport
 83        noise_transport = NoiseTransport(
 84            libp2p_keypair=key_pair,
 85            noise_privkey=noise_key_pair.private_key,
 86            early_data=None,
 87        )
 88
 89        # Create transport upgrader with Noise security
 90        upgrader = TransportUpgrader(
 91            secure_transports_by_protocol={
 92                TProtocol(NOISE_PROTOCOL_ID): noise_transport
 93            },
 94            muxer_transports_by_protocol={TProtocol("/yamux/1.0.0"): Yamux},
 95        )
 96
 97    # Create WebSocket transport
 98    transport = WebsocketTransport(upgrader)
 99
100    # Create swarm and host
101    swarm = Swarm(peer_id, peer_store, upgrader, transport)
102    host = BasicHost(swarm)
103
104    return host
105
106
107async def run(port: int, destination: str, use_plaintext: bool = False) -> None:
108    localhost_ip = "0.0.0.0"
109
110    if not destination:
111        # Create first host (listener) with WebSocket transport
112        listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}/ws")
113
114        try:
115            host = create_websocket_host(use_plaintext=use_plaintext)
116            logger.debug(f"Created host with use_plaintext={use_plaintext}")
117
118            # Set up echo handler
119            host.set_stream_handler(ECHO_PROTOCOL_ID, echo_handler)
120
121            # Add connection event handlers for debugging
122            class DebugNotifee(INotifee):
123                async def opened_stream(self, network, stream):
124                    pass
125
126                async def closed_stream(self, network, stream):
127                    pass
128
129                async def connected(self, network, conn):
130                    print(
131                        f"🔗 New libp2p connection established: "
132                        f"{conn.muxed_conn.peer_id}"
133                    )
134                    if hasattr(conn.muxed_conn, "get_security_protocol"):
135                        security = conn.muxed_conn.get_security_protocol()
136                    else:
137                        security = "Unknown"
138
139                    print(f"   Security: {security}")
140
141                async def disconnected(self, network, conn):
142                    print(f"🔌 libp2p connection closed: {conn.muxed_conn.peer_id}")
143
144                async def listen(self, network, multiaddr):
145                    pass
146
147                async def listen_close(self, network, multiaddr):
148                    pass
149
150            host.get_network().register_notifee(DebugNotifee())
151
152            # Create a cancellation token for clean shutdown
153            cancel_scope = trio.CancelScope()
154
155            async def signal_handler():
156                with trio.open_signal_receiver(signal.SIGINT, signal.SIGTERM) as (
157                    signal_receiver
158                ):
159                    async for sig in signal_receiver:
160                        print(f"\n🛑 Received signal {sig}")
161                        print("✅ Shutting down WebSocket server...")
162                        cancel_scope.cancel()
163                        return
164
165            async with (
166                host.run(listen_addrs=[listen_addr]),
167                trio.open_nursery() as (nursery),
168            ):
169                # Start the peer-store cleanup task
170                nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
171
172                # Start the signal handler
173                nursery.start_soon(signal_handler)
174
175                # Get the actual address and replace 0.0.0.0 with 127.0.0.1 for client
176                # connections
177                addrs = host.get_addrs()
178                logger.debug(f"Host addresses: {addrs}")
179                if not addrs:
180                    print("❌ Error: No addresses found for the host")
181                    print("Debug: host.get_addrs() returned empty list")
182                    return
183
184                server_addr = str(addrs[0])
185                client_addr = server_addr.replace("/ip4/0.0.0.0/", "/ip4/127.0.0.1/")
186
187                print("🌐 WebSocket Server Started Successfully!")
188                print("=" * 50)
189                print(f"📍 Server Address: {client_addr}")
190                print("🔧 Protocol: /echo/1.0.0")
191                print("🚀 Transport: WebSocket (/ws)")
192                print()
193                print("📋 To test the connection, run this in another terminal:")
194                plaintext_flag = " --plaintext" if use_plaintext else ""
195                print(f"   python websocket_demo.py -d {client_addr}{plaintext_flag}")
196                print()
197                print("⏳ Waiting for incoming WebSocket connections...")
198                print("─" * 50)
199
200                # Add a custom handler to show connection events
201                async def custom_echo_handler(stream):
202                    peer_id = stream.muxed_conn.peer_id
203                    print("\n🔗 New WebSocket Connection!")
204                    print(f"   Peer ID: {peer_id}")
205                    print("   Protocol: /echo/1.0.0")
206
207                    # Show remote address in multiaddr format
208                    try:
209                        remote_address = stream.get_remote_address()
210                        if remote_address:
211                            print(f"   Remote: {remote_address}")
212                    except Exception:
213                        print("   Remote: Unknown")
214
215                    print("   ─" * 40)
216
217                    # Call the original handler
218                    await echo_handler(stream)
219
220                    print("   ─" * 40)
221                    print(f"✅ Echo request completed for peer: {peer_id}")
222                    print()
223
224                # Replace the handler with our custom one
225                host.set_stream_handler(ECHO_PROTOCOL_ID, custom_echo_handler)
226
227                # Wait indefinitely or until cancelled
228                with cancel_scope:
229                    await trio.sleep_forever()
230
231        except Exception as e:
232            print(f"❌ Error creating WebSocket server: {e}")
233            traceback.print_exc()
234            return
235
236    else:
237        # Create second host (dialer) with WebSocket transport
238        listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}/ws")
239
240        try:
241            # Create a single host for client operations
242            host = create_websocket_host(use_plaintext=use_plaintext)
243
244            # Start the host for client operations
245            async with (
246                host.run(listen_addrs=[listen_addr]),
247                trio.open_nursery() as (nursery),
248            ):
249                # Start the peer-store cleanup task
250                nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
251
252                # Add connection event handlers for debugging
253                class ClientDebugNotifee(INotifee):
254                    async def opened_stream(self, network, stream):
255                        pass
256
257                    async def closed_stream(self, network, stream):
258                        pass
259
260                    async def connected(self, network, conn):
261                        print(
262                            f"🔗 Client: libp2p connection established: "
263                            f"{conn.muxed_conn.peer_id}"
264                        )
265
266                    async def disconnected(self, network, conn):
267                        print(
268                            f"🔌 Client: libp2p connection closed: "
269                            f"{conn.muxed_conn.peer_id}"
270                        )
271
272                    async def listen(self, network, multiaddr):
273                        pass
274
275                    async def listen_close(self, network, multiaddr):
276                        pass
277
278                host.get_network().register_notifee(ClientDebugNotifee())
279
280                maddr = multiaddr.Multiaddr(destination)
281                info = info_from_p2p_addr(maddr)
282                print("🔌 WebSocket Client Starting...")
283                print("=" * 40)
284                print(f"🎯 Target Peer: {info.peer_id}")
285                print(f"📍 Target Address: {destination}")
286                print()
287
288                try:
289                    print("🔗 Connecting to WebSocket server...")
290                    print(f"   Security: {'Plaintext' if use_plaintext else 'Noise'}")
291                    await host.connect(info)
292                    print("✅ Successfully connected to WebSocket server!")
293                except Exception as e:
294                    error_msg = str(e)
295                    print("\n❌ Connection Failed!")
296                    print(f"   Peer ID: {info.peer_id}")
297                    print(f"   Address: {destination}")
298                    print(f"   Security: {'Plaintext' if use_plaintext else 'Noise'}")
299                    print(f"   Error: {error_msg}")
300                    print(f"   Error type: {type(e).__name__}")
301
302                    # Add more detailed error information for debugging
303                    if hasattr(e, "__cause__") and e.__cause__:
304                        print(f"   Root cause: {e.__cause__}")
305                        print(f"   Root cause type: {type(e.__cause__).__name__}")
306
307                    print()
308                    print("💡 Troubleshooting:")
309                    print("   • Make sure the WebSocket server is running")
310                    print("   • Check that the server address is correct")
311                    print("   • Verify the server is listening on the right port")
312                    print(
313                        "   • Ensure both client and server use the same sec protocol"
314                    )
315                    if not use_plaintext:
316                        print("   • Noise over WebSocket may have compatibility issues")
317                    return
318
319                # Create a stream and send test data
320                try:
321                    stream = await host.new_stream(info.peer_id, [ECHO_PROTOCOL_ID])
322                except Exception as e:
323                    print(f"❌ Failed to create stream: {e}")
324                    return
325
326                try:
327                    print("🚀 Starting Echo Protocol Test...")
328                    print("─" * 40)
329
330                    # Send test data
331                    test_message = b"Hello WebSocket Transport!"
332                    print(f"📤 Sending message: {test_message.decode('utf-8')}")
333                    await stream.write(test_message)
334
335                    # Read response
336                    print("⏳ Waiting for server response...")
337                    response = await stream.read(1024)
338                    print(f"📥 Received response: {response.decode('utf-8')}")
339
340                    await stream.close()
341
342                    print("─" * 40)
343                    if response == test_message:
344                        print("🎉 Echo test successful!")
345                        print("✅ WebSocket transport is working perfectly!")
346                        print("✅ Client completed successfully, exiting.")
347                    else:
348                        print("❌ Echo test failed!")
349                        print("   Response doesn't match sent data.")
350                        print(f"   Sent: {test_message}")
351                        print(f"   Received: {response}")
352
353                except Exception as e:
354                    error_msg = str(e)
355                    print(f"Echo protocol error: {error_msg}")
356                    traceback.print_exc()
357                finally:
358                    # Ensure stream is closed
359                    try:
360                        if stream:
361                            # Check if stream has is_closed method and use it
362                            has_is_closed = hasattr(stream, "is_closed") and callable(
363                                getattr(stream, "is_closed")
364                            )
365                            if has_is_closed:
366                                # type: ignore[attr-defined]
367                                if not await stream.is_closed():
368                                    await stream.close()
369                            else:
370                                # Fallback: just try to close the stream
371                                await stream.close()
372                    except Exception:
373                        pass
374
375                # host.run() context manager handles cleanup automatically
376                print()
377                print("🎉 WebSocket Demo Completed Successfully!")
378                print("=" * 50)
379                print("✅ WebSocket transport is working perfectly!")
380                print("✅ Echo protocol communication successful!")
381                print("✅ libp2p integration verified!")
382                print()
383                print("🚀 Your WebSocket transport is ready for production use!")
384
385                # Add a small delay to ensure all cleanup is complete
386                await trio.sleep(0.1)
387
388        except Exception as e:
389            print(f"❌ Error creating WebSocket client: {e}")
390            traceback.print_exc()
391            return
392
393
394def main() -> None:
395    description = """
396    This program demonstrates the libp2p WebSocket transport.
397    First run
398    'python websocket_demo.py -p <PORT> [--plaintext]' to start a WebSocket server.
399    Then run
400    'python websocket_demo.py <ANOTHER_PORT> -d <DESTINATION> [--plaintext]'
401    where <DESTINATION> is the multiaddress shown by the server.
402
403    By default, this example uses Noise encryption for secure communication.
404    Use --plaintext for testing with unencrypted communication
405    (not recommended for production).
406    """
407
408    example_maddr = (
409        "/ip4/127.0.0.1/tcp/8888/ws/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
410    )
411
412    parser = argparse.ArgumentParser(description=description)
413    parser.add_argument("-p", "--port", default=0, type=int, help="source port number")
414    parser.add_argument(
415        "-d",
416        "--destination",
417        type=str,
418        help=f"destination multiaddr string, e.g. {example_maddr}",
419    )
420    parser.add_argument(
421        "--plaintext",
422        action="store_true",
423        help=(
424            "use plaintext security instead of Noise encryption "
425            "(not recommended for production)"
426        ),
427    )
428
429    args = parser.parse_args()
430
431    # Determine security mode: use Noise by default,
432    # plaintext if --plaintext is specified
433    use_plaintext = args.plaintext
434
435    try:
436        trio.run(run, args.port, args.destination, use_plaintext)
437    except KeyboardInterrupt:
438        # This is expected when Ctrl+C is pressed
439        # The signal handler already printed the shutdown message
440        print("✅ Clean exit completed.")
441        return
442    except Exception as e:
443        print(f"❌ Unexpected error: {e}")
444        return
445
446
447if __name__ == "__main__":
448    main()

Multiaddr Formats

WebSocket transport supports various multiaddr formats:

  • WS (Insecure): /ip4/127.0.0.1/tcp/8080/ws

  • WSS (Secure): /ip4/127.0.0.1/tcp/8080/wss

  • TLS/WS Format: /ip4/127.0.0.1/tcp/8080/tls/ws

  • IPv6: /ip6/::1/tcp/8080/ws

  • DNS: /dns/example.com/tcp/443/wss

Production Considerations

  1. Security: - Always use WSS in production - Use proper CA-signed certificates - Enable certificate validation - AutoTLS is for development/testing only

  2. Performance: - Configure appropriate connection limits - Set reasonable timeouts - Monitor connection statistics - Use connection pooling for high-traffic scenarios

  3. Error Handling: - Handle OpenConnectionError for connection failures - Implement retry logic for transient failures - Monitor failed connection counts

  4. Proxy Configuration: - Use environment variables for flexible deployment - Test proxy connectivity before production deployment - Monitor proxy connection statistics

See Also