WebSocket Transport Examples
This guide demonstrates how to use the WebSocket transport in py-libp2p, including WS (WebSocket) and WSS (WebSocket Secure) protocols, SOCKS proxy support, AutoTLS, and advanced configuration options.
Quick Start
Basic WebSocket transport setup:
from libp2p import new_host
from libp2p.transport.websocket import WebsocketTransport, WebsocketConfig
from libp2p.transport.upgrader import TransportUpgrader
from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID, InsecureTransport
from libp2p.stream_muxer.yamux.yamux import Yamux
from libp2p.custom_types import TProtocol
from libp2p.crypto.rsa import create_new_key_pair
from multiaddr import Multiaddr
# Generate a key pair
key_pair = create_new_key_pair()
# Create upgrader with security and muxer
upgrader = TransportUpgrader(
secure_transports_by_protocol={
TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair)
},
muxer_transports_by_protocol={TProtocol("/yamux/1.0.0"): Yamux}
)
# Create WebSocket transport
config = WebsocketConfig()
transport = WebsocketTransport(upgrader, config=config)
# Create host with WebSocket transport
# Note: new_host handles transport creation internally if not overridden,
# but this shows how to set it up manually if needed.
host = new_host(
listen_addrs=[Multiaddr("/ip4/127.0.0.1/tcp/8080/ws")]
)
WS vs WSS
WS (Insecure WebSocket):
Use for development and testing. No TLS encryption:
ws_addr = Multiaddr("/ip4/127.0.0.1/tcp/8080/ws")
host = new_host(listen_addrs=[ws_addr])
WSS (Secure WebSocket):
Use for production. Requires TLS configuration:
import ssl
# Create TLS context
tls_context = ssl.create_default_context()
tls_context.check_hostname = False
tls_context.verify_mode = ssl.CERT_NONE # For testing only
# Configure transport with TLS
config = WebsocketConfig(tls_server_config=tls_context)
transport = WebsocketTransport(upgrader, config=config)
wss_addr = Multiaddr("/ip4/127.0.0.1/tcp/8080/wss")
host = new_host(listen_addrs=[wss_addr])
SOCKS Proxy Configuration
Using Proxy Factory Function:
from libp2p.transport.websocket import WithProxy
# Configure with SOCKS5 proxy
config = WithProxy(
proxy_url="socks5://proxy.example.com:1080",
auth=("username", "password") # Optional
)
transport = WebsocketTransport(upgrader, config=config)
Using Environment Variables:
from libp2p.transport.websocket import WithProxyFromEnvironment
# Reads HTTP_PROXY or HTTPS_PROXY environment variables
config = WithProxyFromEnvironment()
transport = WebsocketTransport(upgrader, config=config)
Environment Variable Format:
export HTTP_PROXY=socks5://proxy.example.com:1080
export HTTPS_PROXY=socks5://proxy.example.com:1080
AutoTLS Configuration
AutoTLS automatically generates and manages TLS certificates for browser integration:
from libp2p.transport.websocket import WithAutoTLS, AutoTLSConfig
autotls_config = AutoTLSConfig(
enabled=True,
default_domain="localhost",
cert_validity_days=365
)
config = WithAutoTLS(autotls_config)
transport = WebsocketTransport(upgrader, config=config)
Browser Integration:
AutoTLS is particularly useful for browser-based applications where you need trusted certificates for WebSocket connections.
Advanced TLS Configuration
For production deployments with custom certificates:
from libp2p.transport.websocket import WithAdvancedTLS
from libp2p.transport.websocket.tls_config import (
TLSConfig,
CertificateValidationMode
)
tls_config = TLSConfig(
cert_file="path/to/cert.pem",
key_file="path/to/key.pem",
ca_file="path/to/ca.pem",
validation_mode=CertificateValidationMode.STRICT
)
config = WithAdvancedTLS(tls_config)
transport = WebsocketTransport(upgrader, config=config)
Connection Management
Configure connection limits and timeouts:
from libp2p.transport.websocket import (
WithMaxConnections,
WithHandshakeTimeout
)
config = WebsocketConfig(
max_connections=1000,
handshake_timeout=15.0,
max_buffered_amount=4 * 1024 * 1024, # 4MB
max_message_size=32 * 1024 * 1024 # 32MB
)
transport = WebsocketTransport(upgrader, config=config)
Monitoring Connection Statistics:
# Get transport statistics
stats = transport.get_stats()
print(f"Total connections: {stats['total_connections']}")
print(f"Current connections: {stats['current_connections']}")
print(f"Failed connections: {stats['failed_connections']}")
# Get connection details
connections = await transport.get_connections()
for conn_id, conn in connections.items():
conn_stats = conn.get_stats()
print(f"Connection {conn_id}: {conn_stats}")
Complete Example
A complete example demonstrating WebSocket transport with all features:
1import argparse
2import logging
3import signal
4import sys
5import traceback
6
7import multiaddr
8import trio
9
10from libp2p.abc import INotifee
11from libp2p.crypto.ed25519 import create_new_key_pair as create_ed25519_key_pair
12from libp2p.crypto.secp256k1 import create_new_key_pair
13from libp2p.custom_types import TProtocol
14from libp2p.host.basic_host import BasicHost
15from libp2p.network.swarm import Swarm
16from libp2p.peer.id import ID
17from libp2p.peer.peerinfo import info_from_p2p_addr
18from libp2p.peer.peerstore import PeerStore
19from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID, InsecureTransport
20from libp2p.security.noise.transport import (
21 PROTOCOL_ID as NOISE_PROTOCOL_ID,
22 Transport as NoiseTransport,
23)
24from libp2p.stream_muxer.yamux.yamux import Yamux
25from libp2p.transport.upgrader import TransportUpgrader
26from libp2p.transport.websocket.transport import WebsocketTransport
27
28# Enable debug logging
29logging.basicConfig(level=logging.DEBUG)
30
31logger = logging.getLogger("libp2p.websocket-example")
32
33
34# Suppress KeyboardInterrupt by handling SIGINT directly
35def signal_handler(signum, frame):
36 print("✅ Clean exit completed.")
37 sys.exit(0)
38
39
40signal.signal(signal.SIGINT, signal_handler)
41
42# Simple echo protocol
43ECHO_PROTOCOL_ID = TProtocol("/echo/1.0.0")
44
45
46async def echo_handler(stream):
47 """Simple echo handler that echoes back any data received."""
48 try:
49 data = await stream.read(1024)
50 if data:
51 message = data.decode("utf-8", errors="replace")
52 print(f"📥 Received: {message}")
53 print(f"📤 Echoing back: {message}")
54 await stream.write(data)
55 await stream.close()
56 except Exception as e:
57 logger.error(f"Echo handler error: {e}")
58 await stream.close()
59
60
61def create_websocket_host(listen_addrs=None, use_plaintext=False):
62 """Create a host with WebSocket transport."""
63 # Create key pair and peer store
64 key_pair = create_new_key_pair()
65 peer_id = ID.from_pubkey(key_pair.public_key)
66 peer_store = PeerStore()
67 peer_store.add_key_pair(peer_id, key_pair)
68
69 if use_plaintext:
70 # Create transport upgrader with plaintext security
71 upgrader = TransportUpgrader(
72 secure_transports_by_protocol={
73 TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair)
74 },
75 muxer_transports_by_protocol={TProtocol("/yamux/1.0.0"): Yamux},
76 )
77 else:
78 # Create separate Ed25519 key for Noise protocol
79 noise_key_pair = create_ed25519_key_pair()
80
81 # Create Noise transport
82 noise_transport = NoiseTransport(
83 libp2p_keypair=key_pair,
84 noise_privkey=noise_key_pair.private_key,
85 early_data=None,
86 )
87
88 # Create transport upgrader with Noise security
89 upgrader = TransportUpgrader(
90 secure_transports_by_protocol={
91 TProtocol(NOISE_PROTOCOL_ID): noise_transport
92 },
93 muxer_transports_by_protocol={TProtocol("/yamux/1.0.0"): Yamux},
94 )
95
96 # Create WebSocket transport
97 transport = WebsocketTransport(upgrader)
98
99 # Create swarm and host
100 swarm = Swarm(peer_id, peer_store, upgrader, transport)
101 host = BasicHost(swarm)
102
103 return host
104
105
106async def run(port: int, destination: str, use_plaintext: bool = False) -> None:
107 localhost_ip = "0.0.0.0"
108
109 if not destination:
110 # Create first host (listener) with WebSocket transport
111 listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}/ws")
112
113 try:
114 host = create_websocket_host(use_plaintext=use_plaintext)
115 logger.debug(f"Created host with use_plaintext={use_plaintext}")
116
117 # Set up echo handler
118 host.set_stream_handler(ECHO_PROTOCOL_ID, echo_handler)
119
120 # Add connection event handlers for debugging
121 class DebugNotifee(INotifee):
122 async def opened_stream(self, network, stream):
123 pass
124
125 async def closed_stream(self, network, stream):
126 pass
127
128 async def connected(self, network, conn):
129 print(
130 f"🔗 New libp2p connection established: "
131 f"{conn.muxed_conn.peer_id}"
132 )
133 if hasattr(conn.muxed_conn, "get_security_protocol"):
134 security = conn.muxed_conn.get_security_protocol()
135 else:
136 security = "Unknown"
137
138 print(f" Security: {security}")
139
140 async def disconnected(self, network, conn):
141 print(f"🔌 libp2p connection closed: {conn.muxed_conn.peer_id}")
142
143 async def listen(self, network, multiaddr):
144 pass
145
146 async def listen_close(self, network, multiaddr):
147 pass
148
149 host.get_network().register_notifee(DebugNotifee())
150
151 # Create a cancellation token for clean shutdown
152 cancel_scope = trio.CancelScope()
153
154 async def signal_handler():
155 with trio.open_signal_receiver(signal.SIGINT, signal.SIGTERM) as (
156 signal_receiver
157 ):
158 async for sig in signal_receiver:
159 print(f"\n🛑 Received signal {sig}")
160 print("✅ Shutting down WebSocket server...")
161 cancel_scope.cancel()
162 return
163
164 async with (
165 host.run(listen_addrs=[listen_addr]),
166 trio.open_nursery() as (nursery),
167 ):
168 # Start the peer-store cleanup task
169 nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
170
171 # Start the signal handler
172 nursery.start_soon(signal_handler)
173
174 # Get the actual address and replace 0.0.0.0 with 127.0.0.1 for client
175 # connections
176 addrs = host.get_addrs()
177 logger.debug(f"Host addresses: {addrs}")
178 if not addrs:
179 print("❌ Error: No addresses found for the host")
180 print("Debug: host.get_addrs() returned empty list")
181 return
182
183 server_addr = str(addrs[0])
184 client_addr = server_addr.replace("/ip4/0.0.0.0/", "/ip4/127.0.0.1/")
185
186 print("🌐 WebSocket Server Started Successfully!")
187 print("=" * 50)
188 print(f"📍 Server Address: {client_addr}")
189 print("🔧 Protocol: /echo/1.0.0")
190 print("🚀 Transport: WebSocket (/ws)")
191 print()
192 print("📋 To test the connection, run this in another terminal:")
193 plaintext_flag = " --plaintext" if use_plaintext else ""
194 print(f" python websocket_demo.py -d {client_addr}{plaintext_flag}")
195 print()
196 print("⏳ Waiting for incoming WebSocket connections...")
197 print("─" * 50)
198
199 # Add a custom handler to show connection events
200 async def custom_echo_handler(stream):
201 peer_id = stream.muxed_conn.peer_id
202 print("\n🔗 New WebSocket Connection!")
203 print(f" Peer ID: {peer_id}")
204 print(" Protocol: /echo/1.0.0")
205
206 # Show remote address in multiaddr format
207 try:
208 remote_address = stream.get_remote_address()
209 if remote_address:
210 print(f" Remote: {remote_address}")
211 except Exception:
212 print(" Remote: Unknown")
213
214 print(" ─" * 40)
215
216 # Call the original handler
217 await echo_handler(stream)
218
219 print(" ─" * 40)
220 print(f"✅ Echo request completed for peer: {peer_id}")
221 print()
222
223 # Replace the handler with our custom one
224 host.set_stream_handler(ECHO_PROTOCOL_ID, custom_echo_handler)
225
226 # Wait indefinitely or until cancelled
227 with cancel_scope:
228 await trio.sleep_forever()
229
230 except Exception as e:
231 print(f"❌ Error creating WebSocket server: {e}")
232 traceback.print_exc()
233 return
234
235 else:
236 # Create second host (dialer) with WebSocket transport
237 listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}/ws")
238
239 try:
240 # Create a single host for client operations
241 host = create_websocket_host(use_plaintext=use_plaintext)
242
243 # Start the host for client operations
244 async with (
245 host.run(listen_addrs=[listen_addr]),
246 trio.open_nursery() as (nursery),
247 ):
248 # Start the peer-store cleanup task
249 nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
250
251 # Add connection event handlers for debugging
252 class ClientDebugNotifee(INotifee):
253 async def opened_stream(self, network, stream):
254 pass
255
256 async def closed_stream(self, network, stream):
257 pass
258
259 async def connected(self, network, conn):
260 print(
261 f"🔗 Client: libp2p connection established: "
262 f"{conn.muxed_conn.peer_id}"
263 )
264
265 async def disconnected(self, network, conn):
266 print(
267 f"🔌 Client: libp2p connection closed: "
268 f"{conn.muxed_conn.peer_id}"
269 )
270
271 async def listen(self, network, multiaddr):
272 pass
273
274 async def listen_close(self, network, multiaddr):
275 pass
276
277 host.get_network().register_notifee(ClientDebugNotifee())
278
279 maddr = multiaddr.Multiaddr(destination)
280 info = info_from_p2p_addr(maddr)
281 print("🔌 WebSocket Client Starting...")
282 print("=" * 40)
283 print(f"🎯 Target Peer: {info.peer_id}")
284 print(f"📍 Target Address: {destination}")
285 print()
286
287 try:
288 print("🔗 Connecting to WebSocket server...")
289 print(f" Security: {'Plaintext' if use_plaintext else 'Noise'}")
290 await host.connect(info)
291 print("✅ Successfully connected to WebSocket server!")
292 except Exception as e:
293 error_msg = str(e)
294 print("\n❌ Connection Failed!")
295 print(f" Peer ID: {info.peer_id}")
296 print(f" Address: {destination}")
297 print(f" Security: {'Plaintext' if use_plaintext else 'Noise'}")
298 print(f" Error: {error_msg}")
299 print(f" Error type: {type(e).__name__}")
300
301 # Add more detailed error information for debugging
302 if hasattr(e, "__cause__") and e.__cause__:
303 print(f" Root cause: {e.__cause__}")
304 print(f" Root cause type: {type(e.__cause__).__name__}")
305
306 print()
307 print("💡 Troubleshooting:")
308 print(" • Make sure the WebSocket server is running")
309 print(" • Check that the server address is correct")
310 print(" • Verify the server is listening on the right port")
311 print(
312 " • Ensure both client and server use the same sec protocol"
313 )
314 if not use_plaintext:
315 print(" • Noise over WebSocket may have compatibility issues")
316 return
317
318 # Create a stream and send test data
319 try:
320 stream = await host.new_stream(info.peer_id, [ECHO_PROTOCOL_ID])
321 except Exception as e:
322 print(f"❌ Failed to create stream: {e}")
323 return
324
325 try:
326 print("🚀 Starting Echo Protocol Test...")
327 print("─" * 40)
328
329 # Send test data
330 test_message = b"Hello WebSocket Transport!"
331 print(f"📤 Sending message: {test_message.decode('utf-8')}")
332 await stream.write(test_message)
333
334 # Read response
335 print("⏳ Waiting for server response...")
336 response = await stream.read(1024)
337 print(f"📥 Received response: {response.decode('utf-8')}")
338
339 await stream.close()
340
341 print("─" * 40)
342 if response == test_message:
343 print("🎉 Echo test successful!")
344 print("✅ WebSocket transport is working perfectly!")
345 print("✅ Client completed successfully, exiting.")
346 else:
347 print("❌ Echo test failed!")
348 print(" Response doesn't match sent data.")
349 print(f" Sent: {test_message}")
350 print(f" Received: {response}")
351
352 except Exception as e:
353 error_msg = str(e)
354 print(f"Echo protocol error: {error_msg}")
355 traceback.print_exc()
356 finally:
357 # Ensure stream is closed
358 try:
359 if stream:
360 # Check if stream has is_closed method and use it
361 has_is_closed = hasattr(stream, "is_closed") and callable(
362 getattr(stream, "is_closed")
363 )
364 if has_is_closed:
365 # type: ignore[attr-defined]
366 if not await stream.is_closed():
367 await stream.close()
368 else:
369 # Fallback: just try to close the stream
370 await stream.close()
371 except Exception:
372 pass
373
374 # host.run() context manager handles cleanup automatically
375 print()
376 print("🎉 WebSocket Demo Completed Successfully!")
377 print("=" * 50)
378 print("✅ WebSocket transport is working perfectly!")
379 print("✅ Echo protocol communication successful!")
380 print("✅ libp2p integration verified!")
381 print()
382 print("🚀 Your WebSocket transport is ready for production use!")
383
384 # Add a small delay to ensure all cleanup is complete
385 await trio.sleep(0.1)
386
387 except Exception as e:
388 print(f"❌ Error creating WebSocket client: {e}")
389 traceback.print_exc()
390 return
391
392
393def main() -> None:
394 description = """
395 This program demonstrates the libp2p WebSocket transport.
396 First run
397 'python websocket_demo.py -p <PORT> [--plaintext]' to start a WebSocket server.
398 Then run
399 'python websocket_demo.py <ANOTHER_PORT> -d <DESTINATION> [--plaintext]'
400 where <DESTINATION> is the multiaddress shown by the server.
401
402 By default, this example uses Noise encryption for secure communication.
403 Use --plaintext for testing with unencrypted communication
404 (not recommended for production).
405 """
406
407 example_maddr = (
408 "/ip4/127.0.0.1/tcp/8888/ws/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
409 )
410
411 parser = argparse.ArgumentParser(description=description)
412 parser.add_argument("-p", "--port", default=0, type=int, help="source port number")
413 parser.add_argument(
414 "-d",
415 "--destination",
416 type=str,
417 help=f"destination multiaddr string, e.g. {example_maddr}",
418 )
419 parser.add_argument(
420 "--plaintext",
421 action="store_true",
422 help=(
423 "use plaintext security instead of Noise encryption "
424 "(not recommended for production)"
425 ),
426 )
427
428 args = parser.parse_args()
429
430 # Determine security mode: use Noise by default,
431 # plaintext if --plaintext is specified
432 use_plaintext = args.plaintext
433
434 try:
435 trio.run(run, args.port, args.destination, use_plaintext)
436 except KeyboardInterrupt:
437 # This is expected when Ctrl+C is pressed
438 # The signal handler already printed the shutdown message
439 print("✅ Clean exit completed.")
440 return
441 except Exception as e:
442 print(f"❌ Unexpected error: {e}")
443 return
444
445
446if __name__ == "__main__":
447 main()
Multiaddr Formats
WebSocket transport supports various multiaddr formats:
WS (Insecure):
/ip4/127.0.0.1/tcp/8080/wsWSS (Secure):
/ip4/127.0.0.1/tcp/8080/wssTLS/WS Format:
/ip4/127.0.0.1/tcp/8080/tls/wsIPv6:
/ip6/::1/tcp/8080/wsDNS:
/dns/example.com/tcp/443/wss
Production Considerations
Security: - Always use WSS in production - Use proper CA-signed certificates - Enable certificate validation - AutoTLS is for development/testing only
Performance: - Configure appropriate connection limits - Set reasonable timeouts - Monitor connection statistics - Use connection pooling for high-traffic scenarios
Error Handling: - Handle
OpenConnectionErrorfor connection failures - Implement retry logic for transient failures - Monitor failed connection countsProxy Configuration: - Use environment variables for flexible deployment - Test proxy connectivity before production deployment - Monitor proxy connection statistics
See Also
libp2p.transport.websocket package - Full API documentation
Echo Demo - Basic echo protocol example
QUIC Echo Demo - QUIC transport example for comparison