Interoperability Testing

This example provides a standalone console script for testing libp2p ping functionality without Docker or Redis dependencies. It supports both listener and dialer roles and measures ping RTT and handshake times.

Usage

Run as listener (waits for connection):

$ python -m examples.interop.local_ping_test --listener --port 8000
Listener ready, listening on:
  /ip4/127.0.0.1/tcp/8000/p2p/Qm...
Waiting for dialer to connect...

Run as dialer (connects to listener):

$ python -m examples.interop.local_ping_test --dialer --destination /ip4/127.0.0.1/tcp/8000/p2p/Qm...
Connecting to listener at: /ip4/127.0.0.1/tcp/8000/p2p/Qm...
Connected successfully
Performing ping test
{"handshakePlusOneRTTMillis": 15.2, "pingRTTMilllis": 2.1}

Options

  • --listener: Run as listener (wait for connection)

  • --dialer: Run as dialer (connect to listener)

  • --destination ADDR: Destination multiaddr (required for dialer)

  • --port PORT: Port number (0 = auto-select)

  • --transport {tcp,ws,quic-v1}: Transport protocol (default: tcp)

  • --muxer {mplex,yamux}: Stream muxer (default: mplex)

  • --security {noise,plaintext}: Security protocol (default: noise)

  • --test-timeout SECONDS: Test timeout in seconds (default: 180)

  • --debug: Enable debug logging

The full source code for this example is below:

  1#!/usr/bin/env python3
  2"""
  3Local libp2p ping test implementation.
  4
  5This is a standalone console script version of the transport-interop ping test
  6that runs without Docker or Redis dependencies. It supports both listener and
  7dialer roles and measures ping RTT and handshake times.
  8
  9Usage:
 10    # Run as listener (waits for connection)
 11    python local_ping_test.py --listener --port 8000
 12
 13    # Run as dialer (connects to listener)
 14    python local_ping_test.py --dialer --destination /ip4/127.0.0.1/tcp/8000/p2p/Qm...
 15"""
 16
 17import argparse
 18import json
 19import logging
 20import sys
 21import time
 22
 23import multiaddr
 24import trio
 25
 26from libp2p import create_mplex_muxer_option, create_yamux_muxer_option, new_host
 27from libp2p.crypto.ed25519 import create_new_key_pair
 28from libp2p.crypto.x25519 import create_new_key_pair as create_new_x25519_key_pair
 29from libp2p.custom_types import TProtocol
 30from libp2p.network.stream.net_stream import INetStream
 31from libp2p.peer.peerinfo import info_from_p2p_addr
 32from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID, InsecureTransport
 33from libp2p.security.noise.transport import (
 34    PROTOCOL_ID as NOISE_PROTOCOL_ID,
 35    Transport as NoiseTransport,
 36)
 37from libp2p.utils.address_validation import get_available_interfaces
 38
 39PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0")
 40PING_LENGTH = 32
 41MAX_TEST_TIMEOUT = 300
 42DEFAULT_RESP_TIMEOUT = 30
 43
 44logger = logging.getLogger("libp2p.ping_test")
 45
 46
 47def configure_logging(debug: bool = False):
 48    """Configure logging based on debug flag."""
 49    # Set up basic handler on root logger if not already configured
 50    root_logger = logging.getLogger()
 51    if not root_logger.handlers:
 52        handler = logging.StreamHandler(sys.stderr)
 53        formatter = logging.Formatter(
 54            "%(asctime)s [%(levelname)s] [%(name)s] %(message)s"
 55        )
 56        handler.setFormatter(formatter)
 57        root_logger.addHandler(handler)
 58        root_logger.setLevel(logging.DEBUG if debug else logging.INFO)
 59
 60    if debug:
 61        # Set DEBUG level for all relevant loggers (they will propagate to root)
 62        logger_names = [
 63            "libp2p.ping_test",
 64            "libp2p",
 65            "libp2p.transport",
 66            "libp2p.transport.quic",
 67            "libp2p.transport.quic.connection",
 68            "libp2p.transport.quic.listener",
 69            "libp2p.network",
 70            "libp2p.network.connection",
 71            "libp2p.network.connection.swarm_connection",
 72            "libp2p.protocol_muxer",
 73            "libp2p.protocol_muxer.multiselect",
 74            "libp2p.host",
 75            "libp2p.host.basic_host",
 76        ]
 77        for logger_name in logger_names:
 78            logger = logging.getLogger(logger_name)
 79            logger.setLevel(logging.DEBUG)
 80            # Ensure propagation is enabled (default, but be explicit)
 81            logger.propagate = True
 82        print("Debug logging enabled", file=sys.stderr)
 83    else:
 84        root_logger.setLevel(logging.INFO)
 85        logging.getLogger("libp2p.ping_test").setLevel(logging.INFO)
 86        # Suppress verbose logs from dependencies
 87        for logger_name in [
 88            "multiaddr",
 89            "multiaddr.transforms",
 90            "multiaddr.codecs",
 91            "libp2p",
 92            "libp2p.transport",
 93        ]:
 94            logging.getLogger(logger_name).setLevel(logging.WARNING)
 95
 96
 97class PingTest:
 98    def __init__(
 99        self,
100        transport: str = "tcp",
101        muxer: str = "mplex",
102        security: str = "noise",
103        port: int = 0,
104        destination: str | None = None,
105        test_timeout: int = 180,
106        debug: bool = False,
107    ):
108        """Initialize ping test with configuration."""
109        self.transport = transport
110        self.muxer = muxer
111        self.security = security
112        self.port = port
113        self.destination = destination
114        self.is_dialer = destination is not None
115
116        raw_timeout = int(test_timeout)
117        self.test_timeout_seconds = min(raw_timeout, MAX_TEST_TIMEOUT)
118        self.resp_timeout = max(
119            DEFAULT_RESP_TIMEOUT, int(self.test_timeout_seconds * 0.6)
120        )
121        self.debug = debug
122
123        self.host = None
124        self.ping_received = False
125
126    def validate_configuration(self) -> None:
127        """Validate configuration parameters."""
128        valid_transports = ["tcp", "ws", "quic-v1"]
129        valid_security = ["noise", "plaintext"]
130        valid_muxers = ["mplex", "yamux"]
131
132        if self.transport not in valid_transports:
133            raise ValueError(
134                f"Unsupported transport: {self.transport}. "
135                f"Supported: {valid_transports}"
136            )
137        if self.security not in valid_security:
138            raise ValueError(
139                f"Unsupported security: {self.security}. Supported: {valid_security}"
140            )
141        if self.muxer not in valid_muxers:
142            raise ValueError(
143                f"Unsupported muxer: {self.muxer}. Supported: {valid_muxers}"
144            )
145
146    def create_security_options(self):
147        """Create security options based on configuration."""
148        key_pair = create_new_key_pair()
149
150        if self.security == "noise":
151            noise_key_pair = create_new_x25519_key_pair()
152            transport = NoiseTransport(
153                libp2p_keypair=key_pair,
154                noise_privkey=noise_key_pair.private_key,
155                early_data=None,
156            )
157            return {NOISE_PROTOCOL_ID: transport}, key_pair
158        elif self.security == "plaintext":
159            transport = InsecureTransport(
160                local_key_pair=key_pair,
161                secure_bytes_provider=None,
162                peerstore=None,
163            )
164            return {PLAINTEXT_PROTOCOL_ID: transport}, key_pair
165        else:
166            raise ValueError(f"Unsupported security: {self.security}")
167
168    def create_muxer_options(self):
169        """Create muxer options based on configuration."""
170        if self.muxer == "yamux":
171            return create_yamux_muxer_option()
172        elif self.muxer == "mplex":
173            return create_mplex_muxer_option()
174        else:
175            raise ValueError(f"Unsupported muxer: {self.muxer}")
176
177    def _get_ip_value(self, addr) -> str | None:
178        """Extract IP value from multiaddr (IPv4 or IPv6)."""
179        return addr.value_for_protocol("ip4") or addr.value_for_protocol("ip6")
180
181    def _get_protocol_names(self, addr) -> list:
182        """Get protocol names from multiaddr."""
183        return [p.name for p in addr.protocols()]
184
185    def _build_quic_addr(self, ip_value: str, port: int) -> multiaddr.Multiaddr:
186        """Build QUIC address from IP and port."""
187        is_ipv6 = ":" in ip_value
188        if is_ipv6:
189            base = multiaddr.Multiaddr(f"/ip6/{ip_value}/udp/{port}")
190        else:
191            base = multiaddr.Multiaddr(f"/ip4/{ip_value}/udp/{port}")
192        return base.encapsulate(multiaddr.Multiaddr("/quic-v1"))
193
194    def create_listen_addresses(self, port: int = 0) -> list:
195        """Create listen addresses based on transport type."""
196        base_addrs = get_available_interfaces(port, protocol="tcp")
197
198        if self.transport == "quic-v1":
199            # Convert TCP addresses to UDP/QUIC addresses
200            quic_addrs = []
201            for addr in base_addrs:
202                try:
203                    ip_value = self._get_ip_value(addr)
204                    tcp_port = addr.value_for_protocol("tcp") or port
205                    if ip_value:
206                        quic_addr = self._build_quic_addr(ip_value, tcp_port)
207                        # Preserve /p2p component if present
208                        if "p2p" in self._get_protocol_names(addr):
209                            p2p_value = addr.value_for_protocol("p2p")
210                            if p2p_value:
211                                quic_addr = quic_addr.encapsulate(
212                                    multiaddr.Multiaddr(f"/p2p/{p2p_value}")
213                                )
214                        quic_addrs.append(quic_addr)
215                except Exception as e:
216                    print(
217                        f"Error converting address {addr} to QUIC: {e}",
218                        file=sys.stderr,
219                    )
220            if quic_addrs:
221                return quic_addrs
222            return [self._build_quic_addr("127.0.0.1", port)]
223
224        elif self.transport == "ws":
225            # Add /ws protocol to TCP addresses
226            ws_addrs = []
227            for addr in base_addrs:
228                try:
229                    protocols = self._get_protocol_names(addr)
230                    if "ws" in protocols or "wss" in protocols:
231                        ws_addrs.append(addr)
232                    else:
233                        # Preserve /p2p component
234                        p2p_value = None
235                        if "p2p" in protocols:
236                            p2p_value = addr.value_for_protocol("p2p")
237                            if p2p_value:
238                                addr = addr.decapsulate(
239                                    multiaddr.Multiaddr(f"/p2p/{p2p_value}")
240                                )
241                        ws_addr = addr.encapsulate(multiaddr.Multiaddr("/ws"))
242                        if p2p_value:
243                            ws_addr = ws_addr.encapsulate(
244                                multiaddr.Multiaddr(f"/p2p/{p2p_value}")
245                            )
246                        ws_addrs.append(ws_addr)
247                except Exception as e:
248                    print(
249                        f"Error converting address {addr} to WebSocket: {e}",
250                        file=sys.stderr,
251                    )
252            if ws_addrs:
253                return ws_addrs
254            return [multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/{port}/ws")]
255
256        return base_addrs
257
258    def _get_peer_id(self, stream: INetStream) -> str:
259        """Get peer ID from stream, suppressing warnings."""
260        import warnings
261
262        with warnings.catch_warnings():
263            warnings.simplefilter("ignore")
264            try:
265                return str(stream.muxed_conn.peer_id)  # type: ignore
266            except (AttributeError, Exception):
267                return "unknown"
268
269    async def handle_ping(self, stream: INetStream) -> None:
270        """Handle incoming ping requests."""
271        try:
272            payload = await stream.read(PING_LENGTH)
273            if payload is not None:
274                peer_id = self._get_peer_id(stream)
275                print(f"received ping from {peer_id}", file=sys.stderr)
276                await stream.write(payload)
277                print(f"responded with pong to {peer_id}", file=sys.stderr)
278                self.ping_received = True
279        except Exception as e:
280            import traceback
281
282            error_msg = (
283                str(e) if e else "Unknown error (exception object is None or empty)"
284            )
285            error_type = type(e).__name__ if e else "UnknownException"
286            print(f"Error in ping handler: {error_type}: {error_msg}", file=sys.stderr)
287            if self.debug:
288                traceback.print_exc(file=sys.stderr)
289            try:
290                await stream.reset()
291            except Exception:
292                pass
293
294    def log_protocols(self) -> None:
295        """Log registered protocols for debugging."""
296        try:
297            protocols = self.host.get_mux().get_protocols()  # type: ignore
298            protocols_str = [str(p) for p in protocols if p is not None]
299            print(f"Registered protocols: {protocols_str}", file=sys.stderr)
300        except Exception as e:
301            print(f"Error getting protocols: {e}", file=sys.stderr)
302
303    async def send_ping(self, stream: INetStream) -> float:
304        """Send ping and measure RTT."""
305        try:
306            payload = b"\x01" * PING_LENGTH
307            peer_id = self._get_peer_id(stream)
308            print(f"sending ping to {peer_id}", file=sys.stderr)
309
310            ping_start = time.time()
311            await stream.write(payload)
312
313            with trio.fail_after(self.resp_timeout):
314                response = await stream.read(PING_LENGTH)
315                ping_end = time.time()
316
317                if response == payload:
318                    print(f"received pong from {peer_id}", file=sys.stderr)
319                    return (ping_end - ping_start) * 1000
320                else:
321                    raise Exception("Invalid ping response")
322        except Exception as e:
323            print(f"error occurred: {e}", file=sys.stderr)
324            raise
325
326    def _filter_addresses_by_transport(self, addresses: list) -> list:
327        """Filter addresses to match current transport type."""
328        filtered = []
329        for addr in addresses:
330            protocols = self._get_protocol_names(addr)
331            if self.transport == "ws" and ("ws" in protocols or "wss" in protocols):
332                filtered.append(addr)
333            elif self.transport == "quic-v1" and "quic-v1" in protocols:
334                filtered.append(addr)
335            elif self.transport == "tcp" and not any(
336                p in protocols for p in ["ws", "wss", "quic-v1"]
337            ):
338                filtered.append(addr)
339        return filtered if filtered else addresses
340
341    def _get_publishable_address(self, addresses: list) -> str:
342        """Get the best address to publish, preferring non-loopback."""
343        filtered = self._filter_addresses_by_transport(addresses)
344        if not filtered:
345            print(
346                f"Warning: No addresses matched transport {self.transport}, "
347                f"using all addresses",
348                file=sys.stderr,
349            )
350            filtered = addresses
351
352        # Prefer non-loopback addresses
353        for addr in filtered:
354            ip_value = self._get_ip_value(addr)
355            if ip_value and ip_value not in ["127.0.0.1", "0.0.0.0", "::1", "::"]:
356                return str(addr)
357
358        # Fallback: use first address (for localhost testing)
359        return str(filtered[0])
360
361    async def run_listener(self) -> None:
362        """Run the listener role."""
363        self.validate_configuration()
364
365        # Create security and muxer options
366        sec_opt, key_pair = self.create_security_options()
367        muxer_opt = self.create_muxer_options()
368        listen_addrs = self.create_listen_addresses(self.port)
369
370        self.host = new_host(  # type: ignore
371            key_pair=key_pair,
372            sec_opt=sec_opt,
373            muxer_opt=muxer_opt,
374            listen_addrs=listen_addrs,
375            enable_quic=(self.transport == "quic-v1"),
376        )
377        self.host.set_stream_handler(PING_PROTOCOL_ID, self.handle_ping)  # type: ignore
378        self.log_protocols()
379
380        async with self.host.run(listen_addrs=listen_addrs):  # type: ignore
381            all_addrs = self.host.get_addrs()  # type: ignore
382            if not all_addrs:
383                raise RuntimeError("No listen addresses available")
384
385            actual_addr = self._get_publishable_address(all_addrs)
386            print("Listener ready, listening on:", file=sys.stderr)
387            for addr in all_addrs:
388                print(f"  {addr}", file=sys.stderr)
389            print("\nTo connect, use this address:", file=sys.stderr)
390            print(f"  {actual_addr}", file=sys.stderr)
391            print("Waiting for dialer to connect...", file=sys.stderr)
392
393            wait_timeout = min(self.test_timeout_seconds, MAX_TEST_TIMEOUT)
394            check_interval = 0.5
395            elapsed = 0
396
397            while elapsed < wait_timeout:
398                if self.ping_received:
399                    print(
400                        "Ping received and responded, listener exiting",
401                        file=sys.stderr,
402                    )
403                    return
404                await trio.sleep(float(check_interval))  # type: ignore
405                elapsed += check_interval
406
407            if not self.ping_received:
408                print(
409                    f"Timeout: No ping received within {wait_timeout} seconds",
410                    file=sys.stderr,
411                )
412            sys.exit(1)
413
414    def _debug_connection_state(self, network, peer_id) -> None:
415        """Debug connection state (only if debug logging enabled)."""
416        if not self.debug:
417            return
418        try:
419            if hasattr(network, "get_connections_to_peer"):
420                connections = network.get_connections_to_peer(peer_id)
421            elif hasattr(network, "connections"):
422                connections = [
423                    c
424                    for c in network.connections.values()
425                    if c.get_peer_id() == peer_id
426                ]
427            else:
428                connections = []
429            print(
430                f"[DEBUG] Found {len(connections)} connections to peer {peer_id}",
431                file=sys.stderr,
432            )
433            for i, conn in enumerate(connections):
434                muxed = hasattr(conn, "get_muxer")
435                print(
436                    f"[DEBUG] Connection {i}: {type(conn).__name__}, muxed: {muxed}",
437                    file=sys.stderr,
438                )
439                if muxed:
440                    try:
441                        muxer_type = type(conn.get_muxer()).__name__
442                        print(
443                            f"[DEBUG] Connection {i} muxer: {muxer_type}",
444                            file=sys.stderr,
445                        )
446                    except Exception as e:
447                        print(
448                            f"[DEBUG] Connection {i} muxer error: {e}",
449                            file=sys.stderr,
450                        )
451        except Exception as e:
452            print(f"[DEBUG] Error checking connections: {e}", file=sys.stderr)
453
454    async def _create_stream_with_retry(self, peer_id) -> INetStream:
455        """Create ping stream with retry mechanism for connection readiness."""
456        max_retries = 3
457        retry_delay = 0.5
458
459        print("Creating ping stream", file=sys.stderr)
460        if self.debug:
461            print(
462                f"[DEBUG] About to create stream for protocol {PING_PROTOCOL_ID}",
463                file=sys.stderr,
464            )
465
466        for attempt in range(max_retries):
467            try:
468                stream = await self.host.new_stream(peer_id, [PING_PROTOCOL_ID])  # type: ignore
469                print("Ping stream created successfully", file=sys.stderr)
470                return stream
471            except Exception as e:
472                if attempt < max_retries - 1:
473                    if self.debug:
474                        print(
475                            f"[DEBUG] Stream creation attempt {attempt + 1} "
476                            f"failed: {e}, retrying...",
477                            file=sys.stderr,
478                        )
479                    await trio.sleep(retry_delay)
480                else:
481                    if self.debug:
482                        print(
483                            f"[DEBUG] Stream creation failed after {max_retries} "
484                            f"attempts: {e}",
485                            file=sys.stderr,
486                        )
487                    raise
488        raise RuntimeError("Failed to create ping stream after retries")
489
490    async def run_dialer(self) -> None:
491        """Run the dialer role."""
492        print("Running as dialer", file=sys.stderr)
493
494        try:
495            self.validate_configuration()
496
497            if not self.destination:
498                raise ValueError("Destination address is required for dialer mode")
499
500            listener_addr = self.destination
501            print(f"Connecting to listener at: {listener_addr}", file=sys.stderr)
502
503            # Create security and muxer options
504            sec_opt, key_pair = self.create_security_options()
505            muxer_opt = self.create_muxer_options()
506
507            # WS dialer workaround: need listen addresses to register transport
508            # (py-libp2p limitation)
509            dialer_listen_addrs = (
510                self.create_listen_addresses(self.port)
511                if self.transport == "ws"
512                else None
513            )
514            if dialer_listen_addrs:
515                addrs_str = [str(addr) for addr in dialer_listen_addrs]
516                print(
517                    f"Registering WS transport for dialer with addresses: {addrs_str}",
518                    file=sys.stderr,
519                )
520
521            host_kwargs = {
522                "key_pair": key_pair,
523                "sec_opt": sec_opt,
524                "muxer_opt": muxer_opt,
525                "enable_quic": (self.transport == "quic-v1"),
526            }
527            if dialer_listen_addrs:
528                host_kwargs["listen_addrs"] = dialer_listen_addrs  # type: ignore
529
530            self.host = new_host(**host_kwargs)  # type: ignore
531
532            async with self.host.run(listen_addrs=dialer_listen_addrs or []):  # type: ignore
533                handshake_start = time.time()
534                maddr = multiaddr.Multiaddr(listener_addr)
535                info = info_from_p2p_addr(maddr)
536
537                print(f"Connecting to {listener_addr}", file=sys.stderr)
538                if self.debug:
539                    print(
540                        f"[DEBUG] About to call host.connect() for {info.peer_id}",
541                        file=sys.stderr,
542                    )
543                await self.host.connect(info)  # type: ignore
544                print("Connected successfully", file=sys.stderr)
545                if self.debug:
546                    print(
547                        "[DEBUG] host.connect() completed, checking connection state",
548                        file=sys.stderr,
549                    )
550
551                self._debug_connection_state(self.host.get_network(), info.peer_id)  # type: ignore
552
553                # Brief delay to ensure connection is fully ready for stream creation
554                await trio.sleep(0.1)
555
556                # Retry stream creation to handle cases where connection needs more time
557                stream = await self._create_stream_with_retry(info.peer_id)
558
559                print("Performing ping test", file=sys.stderr)
560                ping_rtt = await self.send_ping(stream)
561                print(f"Ping test completed, RTT: {ping_rtt}ms", file=sys.stderr)
562
563                handshake_plus_one_rtt = (time.time() - handshake_start) * 1000
564                result = {
565                    "handshakePlusOneRTTMillis": handshake_plus_one_rtt,
566                    "pingRTTMilllis": ping_rtt,
567                }
568                print(f"Outputting results: {result}", file=sys.stderr)
569                print(json.dumps(result))
570
571                await stream.close()
572                print("Stream closed successfully", file=sys.stderr)
573
574        except Exception as e:
575            print(f"Dialer error: {e}", file=sys.stderr)
576            if self.debug:
577                import traceback
578
579                traceback.print_exc(file=sys.stderr)
580            sys.exit(1)
581
582    async def run(self) -> None:
583        """Main run method."""
584        try:
585            if self.is_dialer:
586                await self.run_dialer()
587            else:
588                await self.run_listener()
589
590        except Exception as e:
591            print(f"Error: {e}", file=sys.stderr)
592            if self.debug:
593                import traceback
594
595                traceback.print_exc(file=sys.stderr)
596            sys.exit(1)
597
598
599def main():
600    """Main entry point."""
601    parser = argparse.ArgumentParser(
602        description="Local libp2p ping test - standalone console script",
603        formatter_class=argparse.RawDescriptionHelpFormatter,
604        epilog="""
605Examples:
606  # Run as listener
607  python local_ping_test.py --listener --port 8000
608
609  # Run as dialer (connect to listener)
610  python local_ping_test.py --dialer --destination /ip4/127.0.0.1/tcp/8000/p2p/Qm...
611
612  # With custom transport/muxer/security
613  python local_ping_test.py --listener --transport ws --muxer yamux --security noise
614        """,
615    )
616
617    # Mode selection
618    mode_group = parser.add_mutually_exclusive_group(required=True)
619    mode_group.add_argument(
620        "--listener",
621        action="store_true",
622        help="Run as listener (wait for connection)",
623    )
624    mode_group.add_argument(
625        "--dialer", action="store_true", help="Run as dialer (connect to listener)"
626    )
627
628    # Connection options
629    parser.add_argument(
630        "-d",
631        "--destination",
632        type=str,
633        help="Destination multiaddr (required for dialer)",
634    )
635    parser.add_argument(
636        "-p", "--port", type=int, default=0, help="Port number (0 = auto-select)"
637    )
638
639    # Configuration options
640    parser.add_argument(
641        "--transport",
642        choices=["tcp", "ws", "quic-v1"],
643        default="tcp",
644        help="Transport protocol (default: tcp)",
645    )
646    parser.add_argument(
647        "--muxer",
648        choices=["mplex", "yamux"],
649        default="mplex",
650        help="Stream muxer (default: mplex)",
651    )
652    parser.add_argument(
653        "--security",
654        choices=["noise", "plaintext"],
655        default="noise",
656        help="Security protocol (default: noise)",
657    )
658
659    # Test options
660    parser.add_argument(
661        "--test-timeout",
662        type=int,
663        default=180,
664        help="Test timeout in seconds (default: 180)",
665    )
666    parser.add_argument("--debug", action="store_true", help="Enable debug logging")
667
668    args = parser.parse_args()
669
670    # Validate arguments
671    if args.dialer and not args.destination:
672        parser.error("--destination is required when running as dialer")
673
674    configure_logging(debug=args.debug)
675
676    ping_test = PingTest(
677        transport=args.transport,
678        muxer=args.muxer,
679        security=args.security,
680        port=args.port,
681        destination=args.destination,
682        test_timeout=args.test_timeout,
683        debug=args.debug,
684    )
685
686    try:
687        trio.run(ping_test.run)
688    except KeyboardInterrupt:
689        print("\nInterrupted by user", file=sys.stderr)
690        sys.exit(0)
691
692
693if __name__ == "__main__":
694    main()