Circuit Relay v2 Example

This example demonstrates how to use Circuit Relay v2 in py-libp2p.

$ python -m pip install libp2p
Collecting libp2p
...
Successfully installed libp2p-x.x.x
$ circuit-relay-demo --role relay --port 8000 --seed 1
Starting relay node...
Listening on: /ip4/0.0.0.0/tcp/8000/p2p/16Uiu2HAm765pEhEgJJmz5xHLt8Ay2a16qtS9uPzjUGjuCKTAtQYN

Copy the relay-id after from the line Listening on:, open a new terminal in same folder and enter below commands

$ circuit-relay-demo --role destination --port 8001 --seed 2 --relay-addr 16Uiu2HAm765pEhEgJJmz5xHLt8Ay2a16qtS9uPzjUGjuCKTAtQYN
Starting destination node...
Listening on: /ip4/0.0.0.0/tcp/8001/p2p/16Uiu2HAkyfVeK1wARrrgCUkEfohLpDcW34TkXc7r426vGcRhy3h2

# Post connection and message transfer
Received message (65 bytes): Hello from 16Uiu2HAkyadSbNMy8BuAQLHeyp71kt4CNaUUKUhERB83NsjKTGAb!
Sent response to 16Uiu2HAkyadSbNMy8BuAQLHeyp71kt4CNaUUKUhERB83NsjKTGAb

Copy the destination-id after from the line Listening on:, open a new terminal in same folder and enter below commands

$ circuit-relay-demo --role source --port 8002 --seed 3 --relay-addr 16Uiu2HAm765pEhEgJJmz5xHLt8Ay2a16qtS9uPzjUGjuCKTAtQYN --dest-id 16Uiu2HAkyfVeK1wARrrgCUkEfohLpDcW34TkXc7r426vGcRhy3h2
Starting source node...
Listening on: /ip4/0.0.0.0/tcp/36345/p2p/16Uiu2HAkyadSbNMy8BuAQLHeyp71kt4CNaUUKUhERB83NsjKTGAb

# Post connection and message transfer
Connected to relay 16Uiu2HAm765pEhEgJJmz5xHLt8Ay2a16qtS9uPzjUGjuCKTAtQYN
Successfully connected to destination through relay!
Sent message to destination
Received response: Hello! This is 16Uiu2HAkyadSbNMy8BuAQLHeyp71kt4CNaUUKUhERB83NsjKTGAb
Source operation completed
  1"""
  2Circuit Relay v2 Example.
  3
  4This example demonstrates using the Circuit Relay v2 protocol by setting up:
  51. A relay node that facilitates connections
  62. A destination node that accepts incoming connections
  73. A source node that connects to the destination through the relay
  8
  9Usage:
 10    # First terminal - start the relay:
 11    python relay_example.py --role relay --port 8000
 12
 13    # Second terminal - start the destination:
 14    python relay_example.py --role destination --port 8001 --relay-addr RELAY_PEER_ID
 15
 16    # Third terminal - start the source:
 17    python relay_example.py --role source \
 18        --relay-addr RELAY_PEER_ID \
 19        --dest-id DESTINATION_PEER_ID
 20"""
 21
 22import argparse
 23import logging
 24import os
 25import sys
 26
 27import multiaddr
 28import trio
 29
 30from libp2p import new_host
 31from libp2p.crypto.secp256k1 import create_new_key_pair
 32from libp2p.custom_types import TProtocol
 33from libp2p.network.stream.net_stream import INetStream
 34from libp2p.peer.id import ID
 35from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr
 36from libp2p.relay.circuit_v2.config import RelayConfig, RelayRole
 37from libp2p.relay.circuit_v2.discovery import RelayDiscovery
 38from libp2p.relay.circuit_v2.protocol import (
 39    PROTOCOL_ID,
 40    STOP_PROTOCOL_ID,
 41    CircuitV2Protocol,
 42)
 43from libp2p.relay.circuit_v2.resources import RelayLimits
 44from libp2p.relay.circuit_v2.transport import CircuitV2Transport
 45from libp2p.tools.async_service import background_trio_service
 46from libp2p.utils.logging import setup_logging as libp2p_setup_logging
 47
 48# Configure logging (default console for this example)
 49logging.basicConfig(
 50    level=logging.INFO,
 51    format="%(asctime)s | %(name)s | %(levelname)s | %(message)s",
 52)
 53logger = logging.getLogger("circuit-relay-example")
 54
 55# Application protocol for our example
 56EXAMPLE_PROTOCOL_ID = TProtocol("/circuit-relay-example/1.0.0")
 57MAX_READ_LEN = 2**16  # 64KB
 58
 59
 60async def handle_example_protocol(stream: INetStream) -> None:
 61    """Handle incoming messages on our example protocol."""
 62    remote_peer_id = stream.muxed_conn.peer_id
 63    try:
 64        remote_addr = stream.get_remote_address()
 65    except Exception:
 66        remote_addr = None
 67    logger.debug(
 68        "[APP] handle_example_protocol: incoming stream |",
 69        "remote_peer=%s | remote_addr=%s | protocol=%s",
 70        remote_peer_id,
 71        remote_addr,
 72        getattr(stream, "protocol_id", None),
 73    )
 74
 75    try:
 76        # Read the incoming message
 77        logger.debug("[APP] waiting to read up to %s bytes from stream", MAX_READ_LEN)
 78        msg = await stream.read(MAX_READ_LEN)
 79        if msg:
 80            logger.info(
 81                "Received message (%d bytes): %s", len(msg), msg.decode(errors="ignore")
 82            )
 83
 84        # Send a response
 85        # Get the local peer ID from the secure connection
 86        local_peer_id = stream.muxed_conn.peer_id
 87        response = f"Hello! This is {local_peer_id}".encode()
 88        logger.debug("[APP] writing %d bytes to stream", len(response))
 89        await stream.write(response)
 90        logger.info("Sent response to %s", remote_peer_id)
 91    except Exception as e:
 92        logger.exception("[APP] Error handling stream: %s", e)
 93    finally:
 94        try:
 95            await stream.close()
 96            logger.debug("[APP] stream closed")
 97        except Exception:
 98            logger.debug("[APP] stream close raised, attempting reset")
 99            try:
100                await stream.reset()
101            except Exception:
102                pass
103
104
105async def setup_relay_node(port: int, seed: int | None = None) -> None:
106    """Set up and run a relay node."""
107    logger.info("Starting relay node...")
108
109    # Create host with a fixed key if seed is provided
110    key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None)
111    logger.debug("[RELAY] created key_pair=%s", type(key_pair).__name__)
112    host = new_host(key_pair=key_pair)
113    logger.debug("[RELAY] host initialized | peer_id=%s", host.get_id())
114
115    # Configure the relay
116    limits = RelayLimits(
117        duration=3600,  # 1 hour
118        data=1024 * 1024 * 100,  # 100 MB
119        max_circuit_conns=10,
120        max_reservations=5,
121    )
122
123    relay_config = RelayConfig(
124        roles=RelayRole.HOP | RelayRole.STOP | RelayRole.CLIENT,  # All capabilities
125        limits=limits,
126    )
127
128    # Initialize the protocol
129    protocol = CircuitV2Protocol(host, limits=limits, allow_hop=True)
130    logger.debug(
131        "[RELAY] CircuitV2Protocol initialized | allow_hop=%s |",
132        "limits(duration=%s,data=%s,max_circuit_conns=%s,max_reservations=%s)",
133        True,
134        limits.duration,
135        limits.data,
136        limits.max_circuit_conns,
137        limits.max_reservations,
138    )
139
140    # Start the host
141    listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
142
143    async with host.run(listen_addrs=[listen_addr]):
144        # Print information about this node
145        peer_id = host.get_id()
146        logger.info(f"Relay node started with ID: {peer_id}")
147
148        addrs = host.get_addrs()
149        for addr in addrs:
150            logger.info(f"Listening on: {addr}")
151
152        # Register protocol handlers
153        logger.debug("[RELAY] registering stream handlers")
154        host.set_stream_handler(EXAMPLE_PROTOCOL_ID, handle_example_protocol)
155        host.set_stream_handler(PROTOCOL_ID, protocol._handle_hop_stream)
156        host.set_stream_handler(STOP_PROTOCOL_ID, protocol._handle_stop_stream)
157        logger.debug("[RELAY] protocol handlers registered")
158
159        # Start the relay protocol service
160        async with background_trio_service(protocol):
161            logger.info("Circuit relay protocol started")
162
163            # Create and register the transport
164            CircuitV2Transport(host, protocol, relay_config)
165            logger.info(
166                "Circuit relay transport initialized | ",
167                "enable_hop=%r enable_stop=%r enable_client=%r",
168                relay_config.enable_hop,
169                relay_config.enable_stop,
170                relay_config.enable_client,
171            )
172
173            print("\nRelay node is running. Use the following address to connect:")
174            print(f"{addrs[0]}")
175            print("\nPress Ctrl+C to exit\n")
176
177            # Keep the relay running
178            await trio.sleep_forever()
179
180
181async def setup_destination_node(
182    port: int, relay_addr: str, seed: int | None = None
183) -> None:
184    """Set up and run a destination node that accepts incoming connections."""
185    logger.info("Starting destination node...")
186
187    # Create host with a fixed key if seed is provided
188    key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None)
189    host = new_host(key_pair=key_pair)
190    logger.debug("[DEST] host initialized | peer_id=%s", host.get_id())
191
192    # Configure the circuit relay client
193    limits = RelayLimits(
194        duration=3600,  # 1 hour
195        data=1024 * 1024 * 100,  # 100 MB
196        max_circuit_conns=10,
197        max_reservations=5,
198    )
199
200    relay_config = RelayConfig(
201        roles=RelayRole.STOP | RelayRole.CLIENT,  # Accept connections and use relays
202        limits=limits,
203    )
204
205    # Initialize the protocol
206    protocol = CircuitV2Protocol(host, limits=limits, allow_hop=False)
207    logger.debug(
208        "[DEST] CircuitV2Protocol initialized | allow_hop=%s |",
209        "limits(duration=%s,data=%s,max_circuit_conns=%s,max_reservations=%s)",
210        False,
211        limits.duration,
212        limits.data,
213        limits.max_circuit_conns,
214        limits.max_reservations,
215    )
216
217    # Start the host
218    listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
219
220    async with host.run(listen_addrs=[listen_addr]):
221        # Print information about this node
222        peer_id = host.get_id()
223        logger.info(f"Destination node started with ID: {peer_id}")
224
225        addrs = host.get_addrs()
226        for addr in addrs:
227            logger.info(f"Listening on: {addr}")
228
229        # Register protocol handlers
230        logger.debug("[DEST] registering stream handlers")
231        host.set_stream_handler(EXAMPLE_PROTOCOL_ID, handle_example_protocol)
232        host.set_stream_handler(PROTOCOL_ID, protocol._handle_hop_stream)
233        host.set_stream_handler(STOP_PROTOCOL_ID, protocol._handle_stop_stream)
234        logger.debug("[DEST] protocol handlers registered")
235
236        # Start the relay protocol service
237        async with background_trio_service(protocol):
238            logger.info("Circuit relay protocol started")
239
240            # Create and initialize transport
241            transport = CircuitV2Transport(host, protocol, relay_config)
242            logger.info(
243                "Circuit relay transport initialized | ",
244                "enable_hop=%r enable_stop=%r enable_client=%r",
245                relay_config.enable_hop,
246                relay_config.enable_stop,
247                relay_config.enable_client,
248            )
249
250            # Create discovery service
251            discovery = RelayDiscovery(host, auto_reserve=True)
252            transport.discovery = discovery
253            logger.info(
254                "[DEST] Relay discovery service created | auto_reserve=%s",
255                True,
256            )
257
258            # Start discovery service
259            async with background_trio_service(discovery):
260                logger.info("Relay discovery service started")
261
262                # Connect to the relay
263                if relay_addr:
264                    logger.info(f"Connecting to relay at {relay_addr}")
265                    try:
266                        # Handle both peer ID only or full multiaddr formats
267                        if relay_addr.startswith("/"):
268                            # Full multiaddr format
269                            relay_maddr = multiaddr.Multiaddr(relay_addr)
270                            relay_info = info_from_p2p_addr(relay_maddr)
271                        else:
272                            # Assume it's just a peer ID
273                            relay_peer_id = ID.from_string(relay_addr)
274                            relay_info = PeerInfo(
275                                relay_peer_id,
276                                [
277                                    multiaddr.Multiaddr(
278                                        f"/ip4/127.0.0.1/tcp/8000/p2p/{relay_addr}"
279                                    )
280                                ],
281                            )
282                            logger.info(
283                                f"Using constructed address: {relay_info.addrs[0]}"
284                            )
285
286                        logger.debug(
287                            "[DEST] attempting host.connect to relay %s",
288                            relay_info.peer_id,
289                        )
290                        await host.connect(relay_info)
291                        logger.info(f"Connected to relay {relay_info.peer_id}")
292                        try:
293                            connected = host.is_peer_connected(relay_info.peer_id)  # type: ignore[attr-defined]
294                            logger.debug("[DEST] relay connected? %s", connected)
295                        except Exception:
296                            pass
297                    except Exception as e:
298                        logger.exception("[DEST] Failed to connect to relay: %s", e)
299                        return
300
301        print("\nDestination node is running with peer ID:")
302        print(f"{peer_id}")
303        print("\nPress Ctrl+C to exit\n")
304
305        # Keep the node running
306        await trio.sleep_forever()
307
308
309async def setup_source_node(
310    relay_addr: str, dest_id: str, seed: int | None = None
311) -> None:
312    """
313    Set up and run a source node that connects to the destination
314    through the relay.
315    """
316    logger.info("Starting source node...")
317
318    if not relay_addr:
319        logger.error("Relay address is required for source mode")
320        return
321
322    if not dest_id:
323        logger.error("Destination peer ID is required for source mode")
324        return
325
326    # Create host with a fixed key if seed is provided
327    key_pair = create_new_key_pair(generate_fixed_private_key(seed) if seed else None)
328    host = new_host(key_pair=key_pair)
329    logger.debug("[SRC] host initialized | peer_id=%s", host.get_id())
330
331    # Configure the circuit relay client
332    limits = RelayLimits(
333        duration=3600,  # 1 hour
334        data=1024 * 1024 * 100,  # 100 MB
335        max_circuit_conns=10,
336        max_reservations=5,
337    )
338
339    relay_config = RelayConfig(
340        roles=RelayRole.STOP | RelayRole.CLIENT,  # Accept connections and use relays
341        limits=limits,
342    )
343
344    # Initialize the protocol
345    protocol = CircuitV2Protocol(host, limits=limits, allow_hop=False)
346    logger.debug(
347        "[SRC] CircuitV2Protocol initialized | allow_hop=%s |",
348        "limits(duration=%d, data=%d, max_circuit_conns=%d, max_reservations=%d)",
349        False,
350        limits.duration,
351        limits.data,
352        limits.max_circuit_conns,
353        limits.max_reservations,
354    )
355
356    # Start the host
357    async with host.run(
358        listen_addrs=[multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/0")]
359    ):  # Use ephemeral port
360        # Print information about this node
361        peer_id = host.get_id()
362        logger.info(f"Source node started with ID: {peer_id}")
363
364        # Get assigned address for debugging
365        addrs = host.get_addrs()
366        if addrs:
367            logger.info(f"Source node listening on: {addrs[0]}")
368
369        # Start the relay protocol service
370        async with background_trio_service(protocol):
371            logger.info("Circuit relay protocol started")
372
373            # Create and initialize transport
374            transport = CircuitV2Transport(host, protocol, relay_config)
375            logger.info(
376                "Circuit relay transport initialized | ",
377                "enable_hop=%r enable_stop=%r enable_client=%r",
378                relay_config.enable_hop,
379                relay_config.enable_stop,
380                relay_config.enable_client,
381            )
382
383            # Create discovery service
384            discovery = RelayDiscovery(host, auto_reserve=True)
385            transport.discovery = discovery
386            logger.info(
387                "[SRC] Relay discovery service created | auto_reserve=%s",
388                True,
389            )
390
391            # Start discovery service
392            async with background_trio_service(discovery):
393                logger.info("Relay discovery service started")
394
395                # Connect to the relay
396                logger.info(f"Connecting to relay at {relay_addr}")
397                try:
398                    # Handle both peer ID only or full multiaddr formats
399                    if relay_addr.startswith("/"):
400                        # Full multiaddr format
401                        relay_maddr = multiaddr.Multiaddr(relay_addr)
402                        relay_info = info_from_p2p_addr(relay_maddr)
403                    else:
404                        # Assume it's just a peer ID
405                        relay_peer_id = ID.from_string(relay_addr)
406                        relay_info = PeerInfo(
407                            relay_peer_id,
408                            [
409                                multiaddr.Multiaddr(
410                                    f"/ip4/127.0.0.1/tcp/8000/p2p/{relay_addr}"
411                                )
412                            ],
413                        )
414                        logger.info(f"Using constructed address: {relay_info.addrs[0]}")
415
416                    logger.debug(
417                        "[SRC] attempting host.connect to relay %s", relay_info.peer_id
418                    )
419                    await host.connect(relay_info)
420                    logger.info(f"Connected to relay {relay_info.peer_id}")
421                    try:
422                        connected = host.is_peer_connected(relay_info.peer_id)  # type: ignore[attr-defined]
423                        logger.debug("[SRC] relay connected? %s", connected)
424                    except Exception:
425                        pass
426
427                    # Wait for relay discovery to find the relay
428                    await trio.sleep(2)
429                    try:
430                        relays = transport.discovery.get_relays()
431                        logger.debug("[SRC] discovered relays: %s", relays)
432                    except Exception:
433                        pass
434
435                    # Convert destination ID string to peer ID
436                    dest_peer_id = ID.from_string(dest_id)
437
438                    # Try to connect to the destination through the relay
439                    logger.info(
440                        f"Connecting to destination {dest_peer_id} through relay"
441                    )
442
443                    # Create peer info with relay
444                    relay_peer_id = relay_info.peer_id
445                    logger.info(f"This is the relay peer id: {relay_peer_id}")
446
447                    # Create a proper peer info with a relay address
448                    # The destination peer should be reachable through a
449                    # p2p-circuit address
450                    circuit_addr = multiaddr.Multiaddr(
451                        f"{relay_info.addrs[0]}/p2p-circuit/p2p/{dest_peer_id}"
452                    )
453                    dest_peer_info = PeerInfo(dest_peer_id, [circuit_addr])
454                    logger.info(f"This is the dest peer info: {dest_peer_info}")
455
456                    # Dial through the relay
457                    try:
458                        logger.info(
459                            f"Attempting to dial destination {dest_peer_id} "
460                            f"through relay {relay_peer_id}"
461                        )
462
463                        logger.debug(
464                            "[SRC] dialing via transport: dest=%s relay=%s",
465                            dest_peer_id,
466                            relay_peer_id,
467                        )
468                        connection = await transport.dial(circuit_addr)
469                        logger.info("Established relay RawConnection: %s", connection)
470
471                        logger.info(
472                            "Successfully connected to destination through relay!"
473                        )
474
475                        # Open a stream to our example protocol
476                        logger.debug(
477                            "[SRC] opening app stream to %s with %s",
478                            dest_peer_id,
479                            EXAMPLE_PROTOCOL_ID,
480                        )
481                        stream = await host.new_stream(
482                            dest_peer_id, [EXAMPLE_PROTOCOL_ID]
483                        )
484                        if stream:
485                            logger.info(
486                                f"Opened stream to destination with protocol "
487                                f"{EXAMPLE_PROTOCOL_ID}"
488                            )
489
490                            # Send a message
491                            msg = f"Hello from {peer_id}!".encode()
492                            logger.debug(
493                                "[SRC] writing %d bytes on app stream", len(msg)
494                            )
495                            await stream.write(msg)
496                            logger.info("Sent message to destination")
497
498                            # Wait for response
499                            logger.debug(
500                                "[SRC] waiting to read up to %d bytes on app stream",
501                                MAX_READ_LEN,
502                            )
503                            response = await stream.read(MAX_READ_LEN)
504                            logger.info(
505                                f"Received response: "
506                                f"{response.decode() if response else 'No response'}"
507                            )
508
509                            # Close the stream
510                            await stream.close()
511                        else:
512                            logger.error("Failed to open stream to destination")
513                    except Exception as e:
514                        logger.exception("[SRC] Failed to dial through relay: %s", e)
515                        logger.error(f"Exception type: {type(e).__name__}")
516                        raise
517
518                except Exception as e:
519                    logger.exception("[SRC] Error: %s", e)
520
521                print("\nSource operation completed")
522                # Keep running for a bit to allow messages to be processed
523                await trio.sleep(5)
524
525
526def generate_fixed_private_key(seed: int | None) -> bytes:
527    """Generate a fixed private key from a seed for reproducible peer IDs."""
528    import random
529
530    if seed is None:
531        # Generate random bytes if no seed provided
532        return random.getrandbits(32 * 8).to_bytes(length=32, byteorder="big")
533
534    random.seed(seed)
535    return random.getrandbits(32 * 8).to_bytes(length=32, byteorder="big")
536
537
538def main() -> None:
539    """Parse arguments and run the appropriate node type."""
540    parser = argparse.ArgumentParser(description="Circuit Relay v2 Example")
541    parser.add_argument(
542        "--role",
543        type=str,
544        choices=["relay", "source", "destination"],
545        required=True,
546        help="Node role (relay, source, or destination)",
547    )
548    parser.add_argument(
549        "--port",
550        type=int,
551        default=0,
552        help="Port to listen on (for relay and destination nodes)",
553    )
554    parser.add_argument(
555        "--relay-addr",
556        type=str,
557        help="Multiaddress or peer ID of relay node (for destination and source nodes)",
558    )
559    parser.add_argument(
560        "--dest-id",
561        type=str,
562        help="Peer ID of destination node (for source node)",
563    )
564    parser.add_argument(
565        "--seed",
566        type=int,
567        help="Random seed for reproducible peer IDs",
568    )
569    parser.add_argument(
570        "--debug",
571        action="store_true",
572        help="Enable debug logging",
573    )
574
575    args = parser.parse_args()
576
577    # Set log level and libp2p structured logging
578    if args.debug:
579        # Enable verbose console logs
580        logging.getLogger().setLevel(logging.DEBUG)
581        logging.getLogger("libp2p").setLevel(logging.DEBUG)
582        # Also enable libp2p file+console logging via env control, if not set
583        os.environ.setdefault("LIBP2P_DEBUG", "DEBUG")
584        try:
585            libp2p_setup_logging()
586            logger.debug("libp2p logging initialized via utils.logging.setup_logging")
587        except Exception as e:
588            logger.debug(
589                "libp2p logging setup failed: %s — continuing with basicConfig", e
590            )
591
592    try:
593        if args.role == "relay":
594            trio.run(setup_relay_node, args.port, args.seed)
595        elif args.role == "destination":
596            if not args.relay_addr:
597                parser.error("--relay-addr is required for destination role")
598            trio.run(setup_destination_node, args.port, args.relay_addr, args.seed)
599        elif args.role == "source":
600            if not args.relay_addr or not args.dest_id:
601                parser.error("--relay-addr and --dest-id are required for source role")
602            trio.run(setup_source_node, args.relay_addr, args.dest_id, args.seed)
603    except KeyboardInterrupt:
604        print("\nExiting...")
605    except Exception as e:
606        print(f"Error: {e}", file=sys.stderr)
607        sys.exit(1)
608
609
610if __name__ == "__main__":
611    main()