WebSocket Transport Examples
This guide demonstrates how to use the WebSocket transport in py-libp2p, including WS (WebSocket) and WSS (WebSocket Secure) protocols, SOCKS proxy support, AutoTLS, and advanced configuration options.
Quick Start
Basic WebSocket transport setup:
from libp2p import new_host
from libp2p.transport.websocket import WebsocketTransport, WebsocketConfig
from libp2p.transport.upgrader import TransportUpgrader
from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID, InsecureTransport
from libp2p.stream_muxer.yamux.yamux import Yamux
from libp2p.custom_types import TProtocol
from libp2p.crypto.rsa import create_new_key_pair
from multiaddr import Multiaddr
# Generate a key pair
key_pair = create_new_key_pair()
# Create upgrader with security and muxer
upgrader = TransportUpgrader(
secure_transports_by_protocol={
TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair)
},
muxer_transports_by_protocol={TProtocol("/yamux/1.0.0"): Yamux}
)
# Create WebSocket transport
config = WebsocketConfig()
transport = WebsocketTransport(upgrader, config=config)
# Create host with WebSocket transport
# Note: new_host handles transport creation internally if not overridden,
# but this shows how to set it up manually if needed.
host = new_host(
listen_addrs=[Multiaddr("/ip4/127.0.0.1/tcp/8080/ws")]
)
WS vs WSS
WS (Insecure WebSocket):
Use for development and testing. No TLS encryption:
ws_addr = Multiaddr("/ip4/127.0.0.1/tcp/8080/ws")
host = new_host(listen_addrs=[ws_addr])
WSS (Secure WebSocket):
Use for production. Requires TLS configuration:
import ssl
# Create TLS context
tls_context = ssl.create_default_context()
tls_context.check_hostname = False
tls_context.verify_mode = ssl.CERT_NONE # For testing only
# Configure transport with TLS
config = WebsocketConfig(tls_server_config=tls_context)
transport = WebsocketTransport(upgrader, config=config)
wss_addr = Multiaddr("/ip4/127.0.0.1/tcp/8080/wss")
host = new_host(listen_addrs=[wss_addr])
SOCKS Proxy Configuration
Using Proxy Factory Function:
from libp2p.transport.websocket import WithProxy
# Configure with SOCKS5 proxy
config = WithProxy(
proxy_url="socks5://proxy.example.com:1080",
auth=("username", "password") # Optional
)
transport = WebsocketTransport(upgrader, config=config)
Using Environment Variables:
from libp2p.transport.websocket import WithProxyFromEnvironment
# Reads HTTP_PROXY or HTTPS_PROXY environment variables
config = WithProxyFromEnvironment()
transport = WebsocketTransport(upgrader, config=config)
Environment Variable Format:
export HTTP_PROXY=socks5://proxy.example.com:1080
export HTTPS_PROXY=socks5://proxy.example.com:1080
AutoTLS Configuration
AutoTLS automatically generates and manages TLS certificates for browser integration:
from libp2p.transport.websocket import WithAutoTLS, AutoTLSConfig
autotls_config = AutoTLSConfig(
enabled=True,
default_domain="localhost",
cert_validity_days=365
)
config = WithAutoTLS(autotls_config)
transport = WebsocketTransport(upgrader, config=config)
Browser Integration:
AutoTLS is particularly useful for browser-based applications where you need trusted certificates for WebSocket connections.
Advanced TLS Configuration
For production deployments with custom certificates:
from libp2p.transport.websocket import WithAdvancedTLS
from libp2p.transport.websocket.tls_config import (
TLSConfig,
CertificateValidationMode
)
tls_config = TLSConfig(
cert_file="path/to/cert.pem",
key_file="path/to/key.pem",
ca_file="path/to/ca.pem",
validation_mode=CertificateValidationMode.STRICT
)
config = WithAdvancedTLS(tls_config)
transport = WebsocketTransport(upgrader, config=config)
Connection Management
Configure connection limits and timeouts:
from libp2p.transport.websocket import (
WithMaxConnections,
WithHandshakeTimeout
)
config = WebsocketConfig(
max_connections=1000,
handshake_timeout=15.0,
max_buffered_amount=4 * 1024 * 1024, # 4MB
max_message_size=32 * 1024 * 1024 # 32MB
)
transport = WebsocketTransport(upgrader, config=config)
Monitoring Connection Statistics:
# Get transport statistics
stats = transport.get_stats()
print(f"Total connections: {stats['total_connections']}")
print(f"Current connections: {stats['current_connections']}")
print(f"Failed connections: {stats['failed_connections']}")
# Get connection details
connections = await transport.get_connections()
for conn_id, conn in connections.items():
conn_stats = conn.get_stats()
print(f"Connection {conn_id}: {conn_stats}")
Complete Example
A complete example demonstrating WebSocket transport with all features:
1import argparse
2import logging
3import signal
4import sys
5import traceback
6
7import multiaddr
8import trio
9
10from libp2p.abc import INotifee
11from libp2p.crypto.secp256k1 import create_new_key_pair
12from libp2p.crypto.x25519 import create_new_key_pair as create_new_x25519_key_pair
13from libp2p.custom_types import TProtocol
14from libp2p.host.basic_host import BasicHost
15from libp2p.network.swarm import Swarm
16from libp2p.peer.id import ID
17from libp2p.peer.peerinfo import info_from_p2p_addr
18from libp2p.peer.peerstore import PeerStore
19from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID, InsecureTransport
20from libp2p.security.noise.transport import (
21 PROTOCOL_ID as NOISE_PROTOCOL_ID,
22 Transport as NoiseTransport,
23)
24from libp2p.stream_muxer.yamux.yamux import Yamux
25from libp2p.transport.upgrader import TransportUpgrader
26from libp2p.transport.websocket.transport import WebsocketTransport
27
28# Enable debug logging
29logging.basicConfig(level=logging.DEBUG)
30
31logger = logging.getLogger("libp2p.websocket-example")
32
33
34# Suppress KeyboardInterrupt by handling SIGINT directly
35def signal_handler(signum, frame):
36 print("✅ Clean exit completed.")
37 sys.exit(0)
38
39
40signal.signal(signal.SIGINT, signal_handler)
41
42# Simple echo protocol
43ECHO_PROTOCOL_ID = TProtocol("/echo/1.0.0")
44
45
46async def echo_handler(stream):
47 """Simple echo handler that echoes back any data received."""
48 try:
49 data = await stream.read(1024)
50 if data:
51 message = data.decode("utf-8", errors="replace")
52 print(f"📥 Received: {message}")
53 print(f"📤 Echoing back: {message}")
54 await stream.write(data)
55 await stream.close()
56 except Exception as e:
57 logger.error(f"Echo handler error: {e}")
58 await stream.close()
59
60
61def create_websocket_host(listen_addrs=None, use_plaintext=False):
62 """Create a host with WebSocket transport."""
63 # Create key pair and peer store
64 key_pair = create_new_key_pair()
65 peer_id = ID.from_pubkey(key_pair.public_key)
66 peer_store = PeerStore()
67 peer_store.add_key_pair(peer_id, key_pair)
68
69 if use_plaintext:
70 # Create transport upgrader with plaintext security
71 upgrader = TransportUpgrader(
72 secure_transports_by_protocol={
73 TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair)
74 },
75 muxer_transports_by_protocol={TProtocol("/yamux/1.0.0"): Yamux},
76 )
77 else:
78 # Separate X25519 keypair for Noise static DH (libp2p Noise spec); identity
79 # uses secp256k1 key_pair above.
80 noise_key_pair = create_new_x25519_key_pair()
81
82 # Create Noise transport
83 noise_transport = NoiseTransport(
84 libp2p_keypair=key_pair,
85 noise_privkey=noise_key_pair.private_key,
86 early_data=None,
87 )
88
89 # Create transport upgrader with Noise security
90 upgrader = TransportUpgrader(
91 secure_transports_by_protocol={
92 TProtocol(NOISE_PROTOCOL_ID): noise_transport
93 },
94 muxer_transports_by_protocol={TProtocol("/yamux/1.0.0"): Yamux},
95 )
96
97 # Create WebSocket transport
98 transport = WebsocketTransport(upgrader)
99
100 # Create swarm and host
101 swarm = Swarm(peer_id, peer_store, upgrader, transport)
102 host = BasicHost(swarm)
103
104 return host
105
106
107async def run(port: int, destination: str, use_plaintext: bool = False) -> None:
108 localhost_ip = "0.0.0.0"
109
110 if not destination:
111 # Create first host (listener) with WebSocket transport
112 listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}/ws")
113
114 try:
115 host = create_websocket_host(use_plaintext=use_plaintext)
116 logger.debug(f"Created host with use_plaintext={use_plaintext}")
117
118 # Set up echo handler
119 host.set_stream_handler(ECHO_PROTOCOL_ID, echo_handler)
120
121 # Add connection event handlers for debugging
122 class DebugNotifee(INotifee):
123 async def opened_stream(self, network, stream):
124 pass
125
126 async def closed_stream(self, network, stream):
127 pass
128
129 async def connected(self, network, conn):
130 print(
131 f"🔗 New libp2p connection established: "
132 f"{conn.muxed_conn.peer_id}"
133 )
134 if hasattr(conn.muxed_conn, "get_security_protocol"):
135 security = conn.muxed_conn.get_security_protocol()
136 else:
137 security = "Unknown"
138
139 print(f" Security: {security}")
140
141 async def disconnected(self, network, conn):
142 print(f"🔌 libp2p connection closed: {conn.muxed_conn.peer_id}")
143
144 async def listen(self, network, multiaddr):
145 pass
146
147 async def listen_close(self, network, multiaddr):
148 pass
149
150 host.get_network().register_notifee(DebugNotifee())
151
152 # Create a cancellation token for clean shutdown
153 cancel_scope = trio.CancelScope()
154
155 async def signal_handler():
156 with trio.open_signal_receiver(signal.SIGINT, signal.SIGTERM) as (
157 signal_receiver
158 ):
159 async for sig in signal_receiver:
160 print(f"\n🛑 Received signal {sig}")
161 print("✅ Shutting down WebSocket server...")
162 cancel_scope.cancel()
163 return
164
165 async with (
166 host.run(listen_addrs=[listen_addr]),
167 trio.open_nursery() as (nursery),
168 ):
169 # Start the peer-store cleanup task
170 nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
171
172 # Start the signal handler
173 nursery.start_soon(signal_handler)
174
175 # Get the actual address and replace 0.0.0.0 with 127.0.0.1 for client
176 # connections
177 addrs = host.get_addrs()
178 logger.debug(f"Host addresses: {addrs}")
179 if not addrs:
180 print("❌ Error: No addresses found for the host")
181 print("Debug: host.get_addrs() returned empty list")
182 return
183
184 server_addr = str(addrs[0])
185 client_addr = server_addr.replace("/ip4/0.0.0.0/", "/ip4/127.0.0.1/")
186
187 print("🌐 WebSocket Server Started Successfully!")
188 print("=" * 50)
189 print(f"📍 Server Address: {client_addr}")
190 print("🔧 Protocol: /echo/1.0.0")
191 print("🚀 Transport: WebSocket (/ws)")
192 print()
193 print("📋 To test the connection, run this in another terminal:")
194 plaintext_flag = " --plaintext" if use_plaintext else ""
195 print(f" python websocket_demo.py -d {client_addr}{plaintext_flag}")
196 print()
197 print("⏳ Waiting for incoming WebSocket connections...")
198 print("─" * 50)
199
200 # Add a custom handler to show connection events
201 async def custom_echo_handler(stream):
202 peer_id = stream.muxed_conn.peer_id
203 print("\n🔗 New WebSocket Connection!")
204 print(f" Peer ID: {peer_id}")
205 print(" Protocol: /echo/1.0.0")
206
207 # Show remote address in multiaddr format
208 try:
209 remote_address = stream.get_remote_address()
210 if remote_address:
211 print(f" Remote: {remote_address}")
212 except Exception:
213 print(" Remote: Unknown")
214
215 print(" ─" * 40)
216
217 # Call the original handler
218 await echo_handler(stream)
219
220 print(" ─" * 40)
221 print(f"✅ Echo request completed for peer: {peer_id}")
222 print()
223
224 # Replace the handler with our custom one
225 host.set_stream_handler(ECHO_PROTOCOL_ID, custom_echo_handler)
226
227 # Wait indefinitely or until cancelled
228 with cancel_scope:
229 await trio.sleep_forever()
230
231 except Exception as e:
232 print(f"❌ Error creating WebSocket server: {e}")
233 traceback.print_exc()
234 return
235
236 else:
237 # Create second host (dialer) with WebSocket transport
238 listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}/ws")
239
240 try:
241 # Create a single host for client operations
242 host = create_websocket_host(use_plaintext=use_plaintext)
243
244 # Start the host for client operations
245 async with (
246 host.run(listen_addrs=[listen_addr]),
247 trio.open_nursery() as (nursery),
248 ):
249 # Start the peer-store cleanup task
250 nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
251
252 # Add connection event handlers for debugging
253 class ClientDebugNotifee(INotifee):
254 async def opened_stream(self, network, stream):
255 pass
256
257 async def closed_stream(self, network, stream):
258 pass
259
260 async def connected(self, network, conn):
261 print(
262 f"🔗 Client: libp2p connection established: "
263 f"{conn.muxed_conn.peer_id}"
264 )
265
266 async def disconnected(self, network, conn):
267 print(
268 f"🔌 Client: libp2p connection closed: "
269 f"{conn.muxed_conn.peer_id}"
270 )
271
272 async def listen(self, network, multiaddr):
273 pass
274
275 async def listen_close(self, network, multiaddr):
276 pass
277
278 host.get_network().register_notifee(ClientDebugNotifee())
279
280 maddr = multiaddr.Multiaddr(destination)
281 info = info_from_p2p_addr(maddr)
282 print("🔌 WebSocket Client Starting...")
283 print("=" * 40)
284 print(f"🎯 Target Peer: {info.peer_id}")
285 print(f"📍 Target Address: {destination}")
286 print()
287
288 try:
289 print("🔗 Connecting to WebSocket server...")
290 print(f" Security: {'Plaintext' if use_plaintext else 'Noise'}")
291 await host.connect(info)
292 print("✅ Successfully connected to WebSocket server!")
293 except Exception as e:
294 error_msg = str(e)
295 print("\n❌ Connection Failed!")
296 print(f" Peer ID: {info.peer_id}")
297 print(f" Address: {destination}")
298 print(f" Security: {'Plaintext' if use_plaintext else 'Noise'}")
299 print(f" Error: {error_msg}")
300 print(f" Error type: {type(e).__name__}")
301
302 # Add more detailed error information for debugging
303 if hasattr(e, "__cause__") and e.__cause__:
304 print(f" Root cause: {e.__cause__}")
305 print(f" Root cause type: {type(e.__cause__).__name__}")
306
307 print()
308 print("💡 Troubleshooting:")
309 print(" • Make sure the WebSocket server is running")
310 print(" • Check that the server address is correct")
311 print(" • Verify the server is listening on the right port")
312 print(
313 " • Ensure both client and server use the same sec protocol"
314 )
315 if not use_plaintext:
316 print(" • Noise over WebSocket may have compatibility issues")
317 return
318
319 # Create a stream and send test data
320 try:
321 stream = await host.new_stream(info.peer_id, [ECHO_PROTOCOL_ID])
322 except Exception as e:
323 print(f"❌ Failed to create stream: {e}")
324 return
325
326 try:
327 print("🚀 Starting Echo Protocol Test...")
328 print("─" * 40)
329
330 # Send test data
331 test_message = b"Hello WebSocket Transport!"
332 print(f"📤 Sending message: {test_message.decode('utf-8')}")
333 await stream.write(test_message)
334
335 # Read response
336 print("⏳ Waiting for server response...")
337 response = await stream.read(1024)
338 print(f"📥 Received response: {response.decode('utf-8')}")
339
340 await stream.close()
341
342 print("─" * 40)
343 if response == test_message:
344 print("🎉 Echo test successful!")
345 print("✅ WebSocket transport is working perfectly!")
346 print("✅ Client completed successfully, exiting.")
347 else:
348 print("❌ Echo test failed!")
349 print(" Response doesn't match sent data.")
350 print(f" Sent: {test_message}")
351 print(f" Received: {response}")
352
353 except Exception as e:
354 error_msg = str(e)
355 print(f"Echo protocol error: {error_msg}")
356 traceback.print_exc()
357 finally:
358 # Ensure stream is closed
359 try:
360 if stream:
361 # Check if stream has is_closed method and use it
362 has_is_closed = hasattr(stream, "is_closed") and callable(
363 getattr(stream, "is_closed")
364 )
365 if has_is_closed:
366 # type: ignore[attr-defined]
367 if not await stream.is_closed():
368 await stream.close()
369 else:
370 # Fallback: just try to close the stream
371 await stream.close()
372 except Exception:
373 pass
374
375 # host.run() context manager handles cleanup automatically
376 print()
377 print("🎉 WebSocket Demo Completed Successfully!")
378 print("=" * 50)
379 print("✅ WebSocket transport is working perfectly!")
380 print("✅ Echo protocol communication successful!")
381 print("✅ libp2p integration verified!")
382 print()
383 print("🚀 Your WebSocket transport is ready for production use!")
384
385 # Add a small delay to ensure all cleanup is complete
386 await trio.sleep(0.1)
387
388 except Exception as e:
389 print(f"❌ Error creating WebSocket client: {e}")
390 traceback.print_exc()
391 return
392
393
394def main() -> None:
395 description = """
396 This program demonstrates the libp2p WebSocket transport.
397 First run
398 'python websocket_demo.py -p <PORT> [--plaintext]' to start a WebSocket server.
399 Then run
400 'python websocket_demo.py <ANOTHER_PORT> -d <DESTINATION> [--plaintext]'
401 where <DESTINATION> is the multiaddress shown by the server.
402
403 By default, this example uses Noise encryption for secure communication.
404 Use --plaintext for testing with unencrypted communication
405 (not recommended for production).
406 """
407
408 example_maddr = (
409 "/ip4/127.0.0.1/tcp/8888/ws/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
410 )
411
412 parser = argparse.ArgumentParser(description=description)
413 parser.add_argument("-p", "--port", default=0, type=int, help="source port number")
414 parser.add_argument(
415 "-d",
416 "--destination",
417 type=str,
418 help=f"destination multiaddr string, e.g. {example_maddr}",
419 )
420 parser.add_argument(
421 "--plaintext",
422 action="store_true",
423 help=(
424 "use plaintext security instead of Noise encryption "
425 "(not recommended for production)"
426 ),
427 )
428
429 args = parser.parse_args()
430
431 # Determine security mode: use Noise by default,
432 # plaintext if --plaintext is specified
433 use_plaintext = args.plaintext
434
435 try:
436 trio.run(run, args.port, args.destination, use_plaintext)
437 except KeyboardInterrupt:
438 # This is expected when Ctrl+C is pressed
439 # The signal handler already printed the shutdown message
440 print("✅ Clean exit completed.")
441 return
442 except Exception as e:
443 print(f"❌ Unexpected error: {e}")
444 return
445
446
447if __name__ == "__main__":
448 main()
Multiaddr Formats
WebSocket transport supports various multiaddr formats:
WS (Insecure):
/ip4/127.0.0.1/tcp/8080/wsWSS (Secure):
/ip4/127.0.0.1/tcp/8080/wssTLS/WS Format:
/ip4/127.0.0.1/tcp/8080/tls/wsIPv6:
/ip6/::1/tcp/8080/wsDNS:
/dns/example.com/tcp/443/wss
Production Considerations
Security: - Always use WSS in production - Use proper CA-signed certificates - Enable certificate validation - AutoTLS is for development/testing only
Performance: - Configure appropriate connection limits - Set reasonable timeouts - Monitor connection statistics - Use connection pooling for high-traffic scenarios
Error Handling: - Handle
OpenConnectionErrorfor connection failures - Implement retry logic for transient failures - Monitor failed connection countsProxy Configuration: - Use environment variables for flexible deployment - Test proxy connectivity before production deployment - Monitor proxy connection statistics
See Also
libp2p.transport.websocket package - Full API documentation
Echo Demo - Basic echo protocol example
QUIC Echo Demo - QUIC transport example for comparison