Circuit Relay v2 Example
This example demonstrates how to use Circuit Relay v2 in py-libp2p.
$ python -m pip install libp2p
Collecting libp2p
...
Successfully installed libp2p-x.x.x
$ circuit-relay-demo --role relay --port 8000 --seed 1
Starting relay node...
Listening on: /ip4/0.0.0.0/tcp/8000/p2p/16Uiu2HAm765pEhEgJJmz5xHLt8Ay2a16qtS9uPzjUGjuCKTAtQYN
Copy the relay-id after from the line Listening on:,
open a new terminal in same folder and enter below commands
$ circuit-relay-demo --role destination --port 8001 --seed 2 --relay-addr 16Uiu2HAm765pEhEgJJmz5xHLt8Ay2a16qtS9uPzjUGjuCKTAtQYN
Starting destination node...
Listening on: /ip4/0.0.0.0/tcp/8001/p2p/16Uiu2HAkyfVeK1wARrrgCUkEfohLpDcW34TkXc7r426vGcRhy3h2
# Post connection and message transfer
Received message (65 bytes): Hello from 16Uiu2HAkyadSbNMy8BuAQLHeyp71kt4CNaUUKUhERB83NsjKTGAb!
Sent response to 16Uiu2HAkyadSbNMy8BuAQLHeyp71kt4CNaUUKUhERB83NsjKTGAb
Copy the destination-id after from the line Listening on:,
open a new terminal in same folder and enter below commands
$ circuit-relay-demo --role source --port 8002 --seed 3 --relay-addr 16Uiu2HAm765pEhEgJJmz5xHLt8Ay2a16qtS9uPzjUGjuCKTAtQYN --dest-id 16Uiu2HAkyfVeK1wARrrgCUkEfohLpDcW34TkXc7r426vGcRhy3h2
Starting source node...
Listening on: /ip4/0.0.0.0/tcp/36345/p2p/16Uiu2HAkyadSbNMy8BuAQLHeyp71kt4CNaUUKUhERB83NsjKTGAb
# Post connection and message transfer
Connected to relay 16Uiu2HAm765pEhEgJJmz5xHLt8Ay2a16qtS9uPzjUGjuCKTAtQYN
Successfully connected to destination through relay!
Sent message to destination
Received response: Hello! This is 16Uiu2HAkyadSbNMy8BuAQLHeyp71kt4CNaUUKUhERB83NsjKTGAb
Source operation completed
1"""
2Circuit Relay v2 Example.
3
4This example demonstrates using the Circuit Relay v2 protocol by setting up:
51. A relay node that facilitates connections
62. A destination node that accepts incoming connections
73. A source node that connects to the destination through the relay
8
9Usage:
10 # First terminal - start the relay:
11 python relay_example.py --role relay --port 8000
12
13 # Second terminal - start the destination:
14 python relay_example.py --role destination --port 8001 --relay-addr RELAY_PEER_ID
15
16 # Third terminal - start the source:
17 python relay_example.py --role source \
18 --relay-addr RELAY_PEER_ID \
19 --dest-id DESTINATION_PEER_ID
20"""
21
22import argparse
23import logging
24import os
25import sys
26
27import multiaddr
28import trio
29
30from libp2p import new_host
31from libp2p.crypto.secp256k1 import create_new_key_pair
32from libp2p.custom_types import TProtocol
33from libp2p.network.stream.net_stream import INetStream
34from libp2p.peer.id import ID
35from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr
36from libp2p.relay.circuit_v2.config import RelayConfig, RelayRole
37from libp2p.relay.circuit_v2.discovery import RelayDiscovery
38from libp2p.relay.circuit_v2.protocol import (
39 PROTOCOL_ID,
40 STOP_PROTOCOL_ID,
41 CircuitV2Protocol,
42)
43from libp2p.relay.circuit_v2.resources import RelayLimits
44from libp2p.relay.circuit_v2.transport import CircuitV2Transport
45from libp2p.tools.async_service import background_trio_service
46from libp2p.utils.logging import setup_logging as libp2p_setup_logging
47
48# Configure logging (default console for this example)
49logging.basicConfig(
50 level=logging.INFO,
51 format="%(asctime)s | %(name)s | %(levelname)s | %(message)s",
52)
53logger = logging.getLogger("circuit-relay-example")
54
55# Application protocol for our example
56EXAMPLE_PROTOCOL_ID = TProtocol("/circuit-relay-example/1.0.0")
57MAX_READ_LEN = 2**16 # 64KB
58
59
60async def handle_example_protocol(stream: INetStream) -> None:
61 """Handle incoming messages on our example protocol."""
62 remote_peer_id = stream.muxed_conn.peer_id
63 try:
64 remote_addr = stream.get_remote_address()
65 except Exception:
66 remote_addr = None
67 logger.debug(
68 "[APP] handle_example_protocol: incoming stream |",
69 "remote_peer=%s | remote_addr=%s | protocol=%s",
70 remote_peer_id,
71 remote_addr,
72 getattr(stream, "protocol_id", None),
73 )
74
75 try:
76 # Read the incoming message
77 logger.debug("[APP] waiting to read up to %s bytes from stream", MAX_READ_LEN)
78 msg = await stream.read(MAX_READ_LEN)
79 if msg:
80 logger.info(
81 "Received message (%d bytes): %s", len(msg), msg.decode(errors="ignore")
82 )
83
84 # Send a response
85 # Get the local peer ID from the secure connection
86 local_peer_id = stream.muxed_conn.peer_id
87 response = f"Hello! This is {local_peer_id}".encode()
88 logger.debug("[APP] writing %d bytes to stream", len(response))
89 await stream.write(response)
90 logger.info("Sent response to %s", remote_peer_id)
91 except Exception as e:
92 logger.exception("[APP] Error handling stream: %s", e)
93 finally:
94 try:
95 await stream.close()
96 logger.debug("[APP] stream closed")
97 except Exception:
98 logger.debug("[APP] stream close raised, attempting reset")
99 try:
100 await stream.reset()
101 except Exception:
102 pass
103
104
105async def setup_relay_node(port: int, seed: int | None = None) -> None:
106 """Set up and run a relay node."""
107 logger.info("Starting relay node...")
108
109 # Create host with a fixed key if seed is provided
110 key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None)
111 logger.debug("[RELAY] created key_pair=%s", type(key_pair).__name__)
112 host = new_host(key_pair=key_pair)
113 logger.debug("[RELAY] host initialized | peer_id=%s", host.get_id())
114
115 # Configure the relay
116 limits = RelayLimits(
117 duration=3600, # 1 hour
118 data=1024 * 1024 * 100, # 100 MB
119 max_circuit_conns=10,
120 max_reservations=5,
121 )
122
123 relay_config = RelayConfig(
124 roles=RelayRole.HOP | RelayRole.STOP | RelayRole.CLIENT, # All capabilities
125 limits=limits,
126 )
127
128 # Initialize the protocol
129 protocol = CircuitV2Protocol(host, limits=limits, allow_hop=True)
130 logger.debug(
131 "[RELAY] CircuitV2Protocol initialized | allow_hop=%s |",
132 "limits(duration=%s,data=%s,max_circuit_conns=%s,max_reservations=%s)",
133 True,
134 limits.duration,
135 limits.data,
136 limits.max_circuit_conns,
137 limits.max_reservations,
138 )
139
140 # Start the host
141 listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
142
143 async with host.run(listen_addrs=[listen_addr]):
144 # Print information about this node
145 peer_id = host.get_id()
146 logger.info(f"Relay node started with ID: {peer_id}")
147
148 addrs = host.get_addrs()
149 for addr in addrs:
150 logger.info(f"Listening on: {addr}")
151
152 # Register protocol handlers
153 logger.debug("[RELAY] registering stream handlers")
154 host.set_stream_handler(EXAMPLE_PROTOCOL_ID, handle_example_protocol)
155 host.set_stream_handler(PROTOCOL_ID, protocol._handle_hop_stream)
156 host.set_stream_handler(STOP_PROTOCOL_ID, protocol._handle_stop_stream)
157 logger.debug("[RELAY] protocol handlers registered")
158
159 # Start the relay protocol service
160 async with background_trio_service(protocol):
161 logger.info("Circuit relay protocol started")
162
163 # Create and register the transport
164 CircuitV2Transport(host, protocol, relay_config)
165 logger.info(
166 "Circuit relay transport initialized | ",
167 "enable_hop=%r enable_stop=%r enable_client=%r",
168 relay_config.enable_hop,
169 relay_config.enable_stop,
170 relay_config.enable_client,
171 )
172
173 print("\nRelay node is running. Use the following address to connect:")
174 print(f"{addrs[0]}")
175 print("\nPress Ctrl+C to exit\n")
176
177 # Keep the relay running
178 await trio.sleep_forever()
179
180
181async def setup_destination_node(
182 port: int, relay_addr: str, seed: int | None = None
183) -> None:
184 """Set up and run a destination node that accepts incoming connections."""
185 logger.info("Starting destination node...")
186
187 # Create host with a fixed key if seed is provided
188 key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None)
189 host = new_host(key_pair=key_pair)
190 logger.debug("[DEST] host initialized | peer_id=%s", host.get_id())
191
192 # Configure the circuit relay client
193 limits = RelayLimits(
194 duration=3600, # 1 hour
195 data=1024 * 1024 * 100, # 100 MB
196 max_circuit_conns=10,
197 max_reservations=5,
198 )
199
200 relay_config = RelayConfig(
201 roles=RelayRole.STOP | RelayRole.CLIENT, # Accept connections and use relays
202 limits=limits,
203 )
204
205 # Initialize the protocol
206 protocol = CircuitV2Protocol(host, limits=limits, allow_hop=False)
207 logger.debug(
208 "[DEST] CircuitV2Protocol initialized | allow_hop=%s |",
209 "limits(duration=%s,data=%s,max_circuit_conns=%s,max_reservations=%s)",
210 False,
211 limits.duration,
212 limits.data,
213 limits.max_circuit_conns,
214 limits.max_reservations,
215 )
216
217 # Start the host
218 listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
219
220 async with host.run(listen_addrs=[listen_addr]):
221 # Print information about this node
222 peer_id = host.get_id()
223 logger.info(f"Destination node started with ID: {peer_id}")
224
225 addrs = host.get_addrs()
226 for addr in addrs:
227 logger.info(f"Listening on: {addr}")
228
229 # Register protocol handlers
230 logger.debug("[DEST] registering stream handlers")
231 host.set_stream_handler(EXAMPLE_PROTOCOL_ID, handle_example_protocol)
232 host.set_stream_handler(PROTOCOL_ID, protocol._handle_hop_stream)
233 host.set_stream_handler(STOP_PROTOCOL_ID, protocol._handle_stop_stream)
234 logger.debug("[DEST] protocol handlers registered")
235
236 # Start the relay protocol service
237 async with background_trio_service(protocol):
238 logger.info("Circuit relay protocol started")
239
240 # Create and initialize transport
241 transport = CircuitV2Transport(host, protocol, relay_config)
242 logger.info(
243 "Circuit relay transport initialized | ",
244 "enable_hop=%r enable_stop=%r enable_client=%r",
245 relay_config.enable_hop,
246 relay_config.enable_stop,
247 relay_config.enable_client,
248 )
249
250 # Create discovery service
251 discovery = RelayDiscovery(host, auto_reserve=True)
252 transport.discovery = discovery
253 logger.info(
254 "[DEST] Relay discovery service created | auto_reserve=%s",
255 True,
256 )
257
258 # Start discovery service
259 async with background_trio_service(discovery):
260 logger.info("Relay discovery service started")
261
262 # Connect to the relay
263 if relay_addr:
264 logger.info(f"Connecting to relay at {relay_addr}")
265 try:
266 # Handle both peer ID only or full multiaddr formats
267 if relay_addr.startswith("/"):
268 # Full multiaddr format
269 relay_maddr = multiaddr.Multiaddr(relay_addr)
270 relay_info = info_from_p2p_addr(relay_maddr)
271 else:
272 # Assume it's just a peer ID
273 relay_peer_id = ID.from_string(relay_addr)
274 relay_info = PeerInfo(
275 relay_peer_id,
276 [
277 multiaddr.Multiaddr(
278 f"/ip4/127.0.0.1/tcp/8000/p2p/{relay_addr}"
279 )
280 ],
281 )
282 logger.info(
283 f"Using constructed address: {relay_info.addrs[0]}"
284 )
285
286 logger.debug(
287 "[DEST] attempting host.connect to relay %s",
288 relay_info.peer_id,
289 )
290 await host.connect(relay_info)
291 logger.info(f"Connected to relay {relay_info.peer_id}")
292 try:
293 connected = host.is_peer_connected(relay_info.peer_id) # type: ignore[attr-defined]
294 logger.debug("[DEST] relay connected? %s", connected)
295 except Exception:
296 pass
297 except Exception as e:
298 logger.exception("[DEST] Failed to connect to relay: %s", e)
299 return
300
301 print("\nDestination node is running with peer ID:")
302 print(f"{peer_id}")
303 print("\nPress Ctrl+C to exit\n")
304
305 # Keep the node running
306 await trio.sleep_forever()
307
308
309async def setup_source_node(
310 relay_addr: str, dest_id: str, seed: int | None = None
311) -> None:
312 """
313 Set up and run a source node that connects to the destination
314 through the relay.
315 """
316 logger.info("Starting source node...")
317
318 if not relay_addr:
319 logger.error("Relay address is required for source mode")
320 return
321
322 if not dest_id:
323 logger.error("Destination peer ID is required for source mode")
324 return
325
326 # Create host with a fixed key if seed is provided
327 key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None)
328 host = new_host(key_pair=key_pair)
329 logger.debug("[SRC] host initialized | peer_id=%s", host.get_id())
330
331 # Configure the circuit relay client
332 limits = RelayLimits(
333 duration=3600, # 1 hour
334 data=1024 * 1024 * 100, # 100 MB
335 max_circuit_conns=10,
336 max_reservations=5,
337 )
338
339 relay_config = RelayConfig(
340 roles=RelayRole.STOP | RelayRole.CLIENT, # Accept connections and use relays
341 limits=limits,
342 )
343
344 # Initialize the protocol
345 protocol = CircuitV2Protocol(host, limits=limits, allow_hop=False)
346 logger.debug(
347 "[SRC] CircuitV2Protocol initialized | allow_hop=%s |",
348 "limits(duration=%d, data=%d, max_circuit_conns=%d, max_reservations=%d)",
349 False,
350 limits.duration,
351 limits.data,
352 limits.max_circuit_conns,
353 limits.max_reservations,
354 )
355
356 # Start the host
357 async with host.run(
358 listen_addrs=[multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/0")]
359 ): # Use ephemeral port
360 # Print information about this node
361 peer_id = host.get_id()
362 logger.info(f"Source node started with ID: {peer_id}")
363
364 # Get assigned address for debugging
365 addrs = host.get_addrs()
366 if addrs:
367 logger.info(f"Source node listening on: {addrs[0]}")
368
369 # Start the relay protocol service
370 async with background_trio_service(protocol):
371 logger.info("Circuit relay protocol started")
372
373 # Create and initialize transport
374 transport = CircuitV2Transport(host, protocol, relay_config)
375 logger.info(
376 "Circuit relay transport initialized | ",
377 "enable_hop=%r enable_stop=%r enable_client=%r",
378 relay_config.enable_hop,
379 relay_config.enable_stop,
380 relay_config.enable_client,
381 )
382
383 # Create discovery service
384 discovery = RelayDiscovery(host, auto_reserve=True)
385 transport.discovery = discovery
386 logger.info(
387 "[SRC] Relay discovery service created | auto_reserve=%s",
388 True,
389 )
390
391 # Start discovery service
392 async with background_trio_service(discovery):
393 logger.info("Relay discovery service started")
394
395 # Connect to the relay
396 logger.info(f"Connecting to relay at {relay_addr}")
397 try:
398 # Handle both peer ID only or full multiaddr formats
399 if relay_addr.startswith("/"):
400 # Full multiaddr format
401 relay_maddr = multiaddr.Multiaddr(relay_addr)
402 relay_info = info_from_p2p_addr(relay_maddr)
403 else:
404 # Assume it's just a peer ID
405 relay_peer_id = ID.from_string(relay_addr)
406 relay_info = PeerInfo(
407 relay_peer_id,
408 [
409 multiaddr.Multiaddr(
410 f"/ip4/127.0.0.1/tcp/8000/p2p/{relay_addr}"
411 )
412 ],
413 )
414 logger.info(f"Using constructed address: {relay_info.addrs[0]}")
415
416 logger.debug(
417 "[SRC] attempting host.connect to relay %s", relay_info.peer_id
418 )
419 await host.connect(relay_info)
420 logger.info(f"Connected to relay {relay_info.peer_id}")
421 try:
422 connected = host.is_peer_connected(relay_info.peer_id) # type: ignore[attr-defined]
423 logger.debug("[SRC] relay connected? %s", connected)
424 except Exception:
425 pass
426
427 # Wait for relay discovery to find the relay
428 await trio.sleep(2)
429 try:
430 relays = transport.discovery.get_relays()
431 logger.debug("[SRC] discovered relays: %s", relays)
432 except Exception:
433 pass
434
435 # Convert destination ID string to peer ID
436 dest_peer_id = ID.from_string(dest_id)
437
438 # Try to connect to the destination through the relay
439 logger.info(
440 f"Connecting to destination {dest_peer_id} through relay"
441 )
442
443 # Create peer info with relay
444 relay_peer_id = relay_info.peer_id
445 logger.info(f"This is the relay peer id: {relay_peer_id}")
446
447 # Create a proper peer info with a relay address
448 # The destination peer should be reachable through a
449 # p2p-circuit address
450 circuit_addr = multiaddr.Multiaddr(
451 f"{relay_info.addrs[0]}/p2p-circuit/p2p/{dest_peer_id}"
452 )
453 dest_peer_info = PeerInfo(dest_peer_id, [circuit_addr])
454 logger.info(f"This is the dest peer info: {dest_peer_info}")
455
456 # Dial through the relay
457 try:
458 logger.info(
459 f"Attempting to dial destination {dest_peer_id} "
460 f"through relay {relay_peer_id}"
461 )
462
463 logger.debug(
464 "[SRC] dialing via transport: dest=%s relay=%s",
465 dest_peer_id,
466 relay_peer_id,
467 )
468 connection = await transport.dial(circuit_addr)
469 logger.info("Established relay RawConnection: %s", connection)
470
471 logger.info(
472 "Successfully connected to destination through relay!"
473 )
474
475 # Open a stream to our example protocol
476 logger.debug(
477 "[SRC] opening app stream to %s with %s",
478 dest_peer_id,
479 EXAMPLE_PROTOCOL_ID,
480 )
481 stream = await host.new_stream(
482 dest_peer_id, [EXAMPLE_PROTOCOL_ID]
483 )
484 if stream:
485 logger.info(
486 f"Opened stream to destination with protocol "
487 f"{EXAMPLE_PROTOCOL_ID}"
488 )
489
490 # Send a message
491 msg = f"Hello from {peer_id}!".encode()
492 logger.debug(
493 "[SRC] writing %d bytes on app stream", len(msg)
494 )
495 await stream.write(msg)
496 logger.info("Sent message to destination")
497
498 # Wait for response
499 logger.debug(
500 "[SRC] waiting to read up to %d bytes on app stream",
501 MAX_READ_LEN,
502 )
503 response = await stream.read(MAX_READ_LEN)
504 logger.info(
505 f"Received response: "
506 f"{response.decode() if response else 'No response'}"
507 )
508
509 # Close the stream
510 await stream.close()
511 else:
512 logger.error("Failed to open stream to destination")
513 except Exception as e:
514 logger.exception("[SRC] Failed to dial through relay: %s", e)
515 logger.error(f"Exception type: {type(e).__name__}")
516 raise
517
518 except Exception as e:
519 logger.exception("[SRC] Error: %s", e)
520
521 print("\nSource operation completed")
522 # Keep running for a bit to allow messages to be processed
523 await trio.sleep(5)
524
525
526def generate_fixed_private_key(seed: int | None) -> bytes:
527 """Generate a fixed private key from a seed for reproducible peer IDs."""
528 import random
529
530 if seed is None:
531 # Generate random bytes if no seed provided
532 return random.getrandbits(32 * 8).to_bytes(length=32, byteorder="big")
533
534 random.seed(seed)
535 return random.getrandbits(32 * 8).to_bytes(length=32, byteorder="big")
536
537
538def main() -> None:
539 """Parse arguments and run the appropriate node type."""
540 parser = argparse.ArgumentParser(description="Circuit Relay v2 Example")
541 parser.add_argument(
542 "--role",
543 type=str,
544 choices=["relay", "source", "destination"],
545 required=True,
546 help="Node role (relay, source, or destination)",
547 )
548 parser.add_argument(
549 "--port",
550 type=int,
551 default=0,
552 help="Port to listen on (for relay and destination nodes)",
553 )
554 parser.add_argument(
555 "--relay-addr",
556 type=str,
557 help="Multiaddress or peer ID of relay node (for destination and source nodes)",
558 )
559 parser.add_argument(
560 "--dest-id",
561 type=str,
562 help="Peer ID of destination node (for source node)",
563 )
564 parser.add_argument(
565 "--seed",
566 type=int,
567 help="Random seed for reproducible peer IDs",
568 )
569 parser.add_argument(
570 "--debug",
571 action="store_true",
572 help="Enable debug logging",
573 )
574
575 args = parser.parse_args()
576
577 # Set log level and libp2p structured logging
578 if args.debug:
579 # Enable verbose console logs
580 logging.getLogger().setLevel(logging.DEBUG)
581 logging.getLogger("libp2p").setLevel(logging.DEBUG)
582 # Also enable libp2p file+console logging via env control, if not set
583 os.environ.setdefault("LIBP2P_DEBUG", "DEBUG")
584 try:
585 libp2p_setup_logging()
586 logger.debug("libp2p logging initialized via utils.logging.setup_logging")
587 except Exception as e:
588 logger.debug(
589 "libp2p logging setup failed: %s — continuing with basicConfig", e
590 )
591
592 try:
593 if args.role == "relay":
594 trio.run(setup_relay_node, args.port, args.seed)
595 elif args.role == "destination":
596 if not args.relay_addr:
597 parser.error("--relay-addr is required for destination role")
598 trio.run(setup_destination_node, args.port, args.relay_addr, args.seed)
599 elif args.role == "source":
600 if not args.relay_addr or not args.dest_id:
601 parser.error("--relay-addr and --dest-id are required for source role")
602 trio.run(setup_source_node, args.relay_addr, args.dest_id, args.seed)
603 except KeyboardInterrupt:
604 print("\nExiting...")
605 except Exception as e:
606 print(f"Error: {e}", file=sys.stderr)
607 sys.exit(1)
608
609
610if __name__ == "__main__":
611 main()