Interoperability Testing
This example provides a standalone console script for testing libp2p ping functionality without Docker or Redis dependencies. It supports both listener and dialer roles and measures ping RTT and handshake times.
Usage
Run as listener (waits for connection):
$ python -m examples.interop.local_ping_test --listener --port 8000
Listener ready, listening on:
/ip4/127.0.0.1/tcp/8000/p2p/Qm...
Waiting for dialer to connect...
Run as dialer (connects to listener):
$ python -m examples.interop.local_ping_test --dialer --destination /ip4/127.0.0.1/tcp/8000/p2p/Qm...
Connecting to listener at: /ip4/127.0.0.1/tcp/8000/p2p/Qm...
Connected successfully
Performing ping test
{"handshakePlusOneRTTMillis": 15.2, "pingRTTMilllis": 2.1}
Options
--listener: Run as listener (wait for connection)--dialer: Run as dialer (connect to listener)--destination ADDR: Destination multiaddr (required for dialer)--port PORT: Port number (0 = auto-select)--transport {tcp,ws,quic-v1}: Transport protocol (default: tcp)--muxer {mplex,yamux}: Stream muxer (default: mplex)--security {noise,plaintext}: Security protocol (default: noise)--test-timeout SECONDS: Test timeout in seconds (default: 180)--debug: Enable debug logging
The full source code for this example is below:
1#!/usr/bin/env python3
2"""
3Local libp2p ping test implementation.
4
5This is a standalone console script version of the transport-interop ping test
6that runs without Docker or Redis dependencies. It supports both listener and
7dialer roles and measures ping RTT and handshake times.
8
9Usage:
10 # Run as listener (waits for connection)
11 python local_ping_test.py --listener --port 8000
12
13 # Run as dialer (connects to listener)
14 python local_ping_test.py --dialer --destination /ip4/127.0.0.1/tcp/8000/p2p/Qm...
15"""
16
17import argparse
18import json
19import logging
20import sys
21import time
22
23import multiaddr
24import trio
25
26from libp2p import create_mplex_muxer_option, create_yamux_muxer_option, new_host
27from libp2p.crypto.ed25519 import create_new_key_pair
28from libp2p.crypto.x25519 import create_new_key_pair as create_new_x25519_key_pair
29from libp2p.custom_types import TProtocol
30from libp2p.network.stream.net_stream import INetStream
31from libp2p.peer.peerinfo import info_from_p2p_addr
32from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID, InsecureTransport
33from libp2p.security.noise.transport import (
34 PROTOCOL_ID as NOISE_PROTOCOL_ID,
35 Transport as NoiseTransport,
36)
37from libp2p.utils.address_validation import get_available_interfaces
38
39PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0")
40PING_LENGTH = 32
41MAX_TEST_TIMEOUT = 300
42DEFAULT_RESP_TIMEOUT = 30
43
44logger = logging.getLogger("libp2p.ping_test")
45
46
47def configure_logging(debug: bool = False):
48 """Configure logging based on debug flag."""
49 # Set up basic handler on root logger if not already configured
50 root_logger = logging.getLogger()
51 if not root_logger.handlers:
52 handler = logging.StreamHandler(sys.stderr)
53 formatter = logging.Formatter(
54 "%(asctime)s [%(levelname)s] [%(name)s] %(message)s"
55 )
56 handler.setFormatter(formatter)
57 root_logger.addHandler(handler)
58 root_logger.setLevel(logging.DEBUG if debug else logging.INFO)
59
60 if debug:
61 # Set DEBUG level for all relevant loggers (they will propagate to root)
62 logger_names = [
63 "libp2p.ping_test",
64 "libp2p",
65 "libp2p.transport",
66 "libp2p.transport.quic",
67 "libp2p.transport.quic.connection",
68 "libp2p.transport.quic.listener",
69 "libp2p.network",
70 "libp2p.network.connection",
71 "libp2p.network.connection.swarm_connection",
72 "libp2p.protocol_muxer",
73 "libp2p.protocol_muxer.multiselect",
74 "libp2p.host",
75 "libp2p.host.basic_host",
76 ]
77 for logger_name in logger_names:
78 logger = logging.getLogger(logger_name)
79 logger.setLevel(logging.DEBUG)
80 # Ensure propagation is enabled (default, but be explicit)
81 logger.propagate = True
82 print("Debug logging enabled", file=sys.stderr)
83 else:
84 root_logger.setLevel(logging.INFO)
85 logging.getLogger("libp2p.ping_test").setLevel(logging.INFO)
86 # Suppress verbose logs from dependencies
87 for logger_name in [
88 "multiaddr",
89 "multiaddr.transforms",
90 "multiaddr.codecs",
91 "libp2p",
92 "libp2p.transport",
93 ]:
94 logging.getLogger(logger_name).setLevel(logging.WARNING)
95
96
97class PingTest:
98 def __init__(
99 self,
100 transport: str = "tcp",
101 muxer: str = "mplex",
102 security: str = "noise",
103 port: int = 0,
104 destination: str | None = None,
105 test_timeout: int = 180,
106 debug: bool = False,
107 ):
108 """Initialize ping test with configuration."""
109 self.transport = transport
110 self.muxer = muxer
111 self.security = security
112 self.port = port
113 self.destination = destination
114 self.is_dialer = destination is not None
115
116 raw_timeout = int(test_timeout)
117 self.test_timeout_seconds = min(raw_timeout, MAX_TEST_TIMEOUT)
118 self.resp_timeout = max(
119 DEFAULT_RESP_TIMEOUT, int(self.test_timeout_seconds * 0.6)
120 )
121 self.debug = debug
122
123 self.host = None
124 self.ping_received = False
125
126 def validate_configuration(self) -> None:
127 """Validate configuration parameters."""
128 valid_transports = ["tcp", "ws", "quic-v1"]
129 valid_security = ["noise", "plaintext"]
130 valid_muxers = ["mplex", "yamux"]
131
132 if self.transport not in valid_transports:
133 raise ValueError(
134 f"Unsupported transport: {self.transport}. "
135 f"Supported: {valid_transports}"
136 )
137 if self.security not in valid_security:
138 raise ValueError(
139 f"Unsupported security: {self.security}. Supported: {valid_security}"
140 )
141 if self.muxer not in valid_muxers:
142 raise ValueError(
143 f"Unsupported muxer: {self.muxer}. Supported: {valid_muxers}"
144 )
145
146 def create_security_options(self):
147 """Create security options based on configuration."""
148 key_pair = create_new_key_pair()
149
150 if self.security == "noise":
151 noise_key_pair = create_new_x25519_key_pair()
152 transport = NoiseTransport(
153 libp2p_keypair=key_pair,
154 noise_privkey=noise_key_pair.private_key,
155 early_data=None,
156 )
157 return {NOISE_PROTOCOL_ID: transport}, key_pair
158 elif self.security == "plaintext":
159 transport = InsecureTransport(
160 local_key_pair=key_pair,
161 secure_bytes_provider=None,
162 peerstore=None,
163 )
164 return {PLAINTEXT_PROTOCOL_ID: transport}, key_pair
165 else:
166 raise ValueError(f"Unsupported security: {self.security}")
167
168 def create_muxer_options(self):
169 """Create muxer options based on configuration."""
170 if self.muxer == "yamux":
171 return create_yamux_muxer_option()
172 elif self.muxer == "mplex":
173 return create_mplex_muxer_option()
174 else:
175 raise ValueError(f"Unsupported muxer: {self.muxer}")
176
177 def _get_ip_value(self, addr) -> str | None:
178 """Extract IP value from multiaddr (IPv4 or IPv6)."""
179 return addr.value_for_protocol("ip4") or addr.value_for_protocol("ip6")
180
181 def _get_protocol_names(self, addr) -> list:
182 """Get protocol names from multiaddr."""
183 return [p.name for p in addr.protocols()]
184
185 def _build_quic_addr(self, ip_value: str, port: int) -> multiaddr.Multiaddr:
186 """Build QUIC address from IP and port."""
187 is_ipv6 = ":" in ip_value
188 if is_ipv6:
189 base = multiaddr.Multiaddr(f"/ip6/{ip_value}/udp/{port}")
190 else:
191 base = multiaddr.Multiaddr(f"/ip4/{ip_value}/udp/{port}")
192 return base.encapsulate(multiaddr.Multiaddr("/quic-v1"))
193
194 def create_listen_addresses(self, port: int = 0) -> list:
195 """Create listen addresses based on transport type."""
196 base_addrs = get_available_interfaces(port, protocol="tcp")
197
198 if self.transport == "quic-v1":
199 # Convert TCP addresses to UDP/QUIC addresses
200 quic_addrs = []
201 for addr in base_addrs:
202 try:
203 ip_value = self._get_ip_value(addr)
204 tcp_port = addr.value_for_protocol("tcp") or port
205 if ip_value:
206 quic_addr = self._build_quic_addr(ip_value, tcp_port)
207 # Preserve /p2p component if present
208 if "p2p" in self._get_protocol_names(addr):
209 p2p_value = addr.value_for_protocol("p2p")
210 if p2p_value:
211 quic_addr = quic_addr.encapsulate(
212 multiaddr.Multiaddr(f"/p2p/{p2p_value}")
213 )
214 quic_addrs.append(quic_addr)
215 except Exception as e:
216 print(
217 f"Error converting address {addr} to QUIC: {e}",
218 file=sys.stderr,
219 )
220 if quic_addrs:
221 return quic_addrs
222 return [self._build_quic_addr("127.0.0.1", port)]
223
224 elif self.transport == "ws":
225 # Add /ws protocol to TCP addresses
226 ws_addrs = []
227 for addr in base_addrs:
228 try:
229 protocols = self._get_protocol_names(addr)
230 if "ws" in protocols or "wss" in protocols:
231 ws_addrs.append(addr)
232 else:
233 # Preserve /p2p component
234 p2p_value = None
235 if "p2p" in protocols:
236 p2p_value = addr.value_for_protocol("p2p")
237 if p2p_value:
238 addr = addr.decapsulate(
239 multiaddr.Multiaddr(f"/p2p/{p2p_value}")
240 )
241 ws_addr = addr.encapsulate(multiaddr.Multiaddr("/ws"))
242 if p2p_value:
243 ws_addr = ws_addr.encapsulate(
244 multiaddr.Multiaddr(f"/p2p/{p2p_value}")
245 )
246 ws_addrs.append(ws_addr)
247 except Exception as e:
248 print(
249 f"Error converting address {addr} to WebSocket: {e}",
250 file=sys.stderr,
251 )
252 if ws_addrs:
253 return ws_addrs
254 return [multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/{port}/ws")]
255
256 return base_addrs
257
258 def _get_peer_id(self, stream: INetStream) -> str:
259 """Get peer ID from stream, suppressing warnings."""
260 import warnings
261
262 with warnings.catch_warnings():
263 warnings.simplefilter("ignore")
264 try:
265 return str(stream.muxed_conn.peer_id) # type: ignore
266 except (AttributeError, Exception):
267 return "unknown"
268
269 async def handle_ping(self, stream: INetStream) -> None:
270 """Handle incoming ping requests."""
271 try:
272 payload = await stream.read(PING_LENGTH)
273 if payload is not None:
274 peer_id = self._get_peer_id(stream)
275 print(f"received ping from {peer_id}", file=sys.stderr)
276 await stream.write(payload)
277 print(f"responded with pong to {peer_id}", file=sys.stderr)
278 self.ping_received = True
279 except Exception as e:
280 import traceback
281
282 error_msg = (
283 str(e) if e else "Unknown error (exception object is None or empty)"
284 )
285 error_type = type(e).__name__ if e else "UnknownException"
286 print(f"Error in ping handler: {error_type}: {error_msg}", file=sys.stderr)
287 if self.debug:
288 traceback.print_exc(file=sys.stderr)
289 try:
290 await stream.reset()
291 except Exception:
292 pass
293
294 def log_protocols(self) -> None:
295 """Log registered protocols for debugging."""
296 try:
297 protocols = self.host.get_mux().get_protocols() # type: ignore
298 protocols_str = [str(p) for p in protocols if p is not None]
299 print(f"Registered protocols: {protocols_str}", file=sys.stderr)
300 except Exception as e:
301 print(f"Error getting protocols: {e}", file=sys.stderr)
302
303 async def send_ping(self, stream: INetStream) -> float:
304 """Send ping and measure RTT."""
305 try:
306 payload = b"\x01" * PING_LENGTH
307 peer_id = self._get_peer_id(stream)
308 print(f"sending ping to {peer_id}", file=sys.stderr)
309
310 ping_start = time.time()
311 await stream.write(payload)
312
313 with trio.fail_after(self.resp_timeout):
314 response = await stream.read(PING_LENGTH)
315 ping_end = time.time()
316
317 if response == payload:
318 print(f"received pong from {peer_id}", file=sys.stderr)
319 return (ping_end - ping_start) * 1000
320 else:
321 raise Exception("Invalid ping response")
322 except Exception as e:
323 print(f"error occurred: {e}", file=sys.stderr)
324 raise
325
326 def _filter_addresses_by_transport(self, addresses: list) -> list:
327 """Filter addresses to match current transport type."""
328 filtered = []
329 for addr in addresses:
330 protocols = self._get_protocol_names(addr)
331 if self.transport == "ws" and ("ws" in protocols or "wss" in protocols):
332 filtered.append(addr)
333 elif self.transport == "quic-v1" and "quic-v1" in protocols:
334 filtered.append(addr)
335 elif self.transport == "tcp" and not any(
336 p in protocols for p in ["ws", "wss", "quic-v1"]
337 ):
338 filtered.append(addr)
339 return filtered if filtered else addresses
340
341 def _get_publishable_address(self, addresses: list) -> str:
342 """Get the best address to publish, preferring non-loopback."""
343 filtered = self._filter_addresses_by_transport(addresses)
344 if not filtered:
345 print(
346 f"Warning: No addresses matched transport {self.transport}, "
347 f"using all addresses",
348 file=sys.stderr,
349 )
350 filtered = addresses
351
352 # Prefer non-loopback addresses
353 for addr in filtered:
354 ip_value = self._get_ip_value(addr)
355 if ip_value and ip_value not in ["127.0.0.1", "0.0.0.0", "::1", "::"]:
356 return str(addr)
357
358 # Fallback: use first address (for localhost testing)
359 return str(filtered[0])
360
361 async def run_listener(self) -> None:
362 """Run the listener role."""
363 self.validate_configuration()
364
365 # Create security and muxer options
366 sec_opt, key_pair = self.create_security_options()
367 muxer_opt = self.create_muxer_options()
368 listen_addrs = self.create_listen_addresses(self.port)
369
370 self.host = new_host( # type: ignore
371 key_pair=key_pair,
372 sec_opt=sec_opt,
373 muxer_opt=muxer_opt,
374 listen_addrs=listen_addrs,
375 enable_quic=(self.transport == "quic-v1"),
376 )
377 self.host.set_stream_handler(PING_PROTOCOL_ID, self.handle_ping) # type: ignore
378 self.log_protocols()
379
380 async with self.host.run(listen_addrs=listen_addrs): # type: ignore
381 all_addrs = self.host.get_addrs() # type: ignore
382 if not all_addrs:
383 raise RuntimeError("No listen addresses available")
384
385 actual_addr = self._get_publishable_address(all_addrs)
386 print("Listener ready, listening on:", file=sys.stderr)
387 for addr in all_addrs:
388 print(f" {addr}", file=sys.stderr)
389 print("\nTo connect, use this address:", file=sys.stderr)
390 print(f" {actual_addr}", file=sys.stderr)
391 print("Waiting for dialer to connect...", file=sys.stderr)
392
393 wait_timeout = min(self.test_timeout_seconds, MAX_TEST_TIMEOUT)
394 check_interval = 0.5
395 elapsed = 0
396
397 while elapsed < wait_timeout:
398 if self.ping_received:
399 print(
400 "Ping received and responded, listener exiting",
401 file=sys.stderr,
402 )
403 return
404 await trio.sleep(float(check_interval)) # type: ignore
405 elapsed += check_interval
406
407 if not self.ping_received:
408 print(
409 f"Timeout: No ping received within {wait_timeout} seconds",
410 file=sys.stderr,
411 )
412 sys.exit(1)
413
414 def _debug_connection_state(self, network, peer_id) -> None:
415 """Debug connection state (only if debug logging enabled)."""
416 if not self.debug:
417 return
418 try:
419 if hasattr(network, "get_connections_to_peer"):
420 connections = network.get_connections_to_peer(peer_id)
421 elif hasattr(network, "connections"):
422 connections = [
423 c
424 for c in network.connections.values()
425 if c.get_peer_id() == peer_id
426 ]
427 else:
428 connections = []
429 print(
430 f"[DEBUG] Found {len(connections)} connections to peer {peer_id}",
431 file=sys.stderr,
432 )
433 for i, conn in enumerate(connections):
434 muxed = hasattr(conn, "get_muxer")
435 print(
436 f"[DEBUG] Connection {i}: {type(conn).__name__}, muxed: {muxed}",
437 file=sys.stderr,
438 )
439 if muxed:
440 try:
441 muxer_type = type(conn.get_muxer()).__name__
442 print(
443 f"[DEBUG] Connection {i} muxer: {muxer_type}",
444 file=sys.stderr,
445 )
446 except Exception as e:
447 print(
448 f"[DEBUG] Connection {i} muxer error: {e}",
449 file=sys.stderr,
450 )
451 except Exception as e:
452 print(f"[DEBUG] Error checking connections: {e}", file=sys.stderr)
453
454 async def _create_stream_with_retry(self, peer_id) -> INetStream:
455 """Create ping stream with retry mechanism for connection readiness."""
456 max_retries = 3
457 retry_delay = 0.5
458
459 print("Creating ping stream", file=sys.stderr)
460 if self.debug:
461 print(
462 f"[DEBUG] About to create stream for protocol {PING_PROTOCOL_ID}",
463 file=sys.stderr,
464 )
465
466 for attempt in range(max_retries):
467 try:
468 stream = await self.host.new_stream(peer_id, [PING_PROTOCOL_ID]) # type: ignore
469 print("Ping stream created successfully", file=sys.stderr)
470 return stream
471 except Exception as e:
472 if attempt < max_retries - 1:
473 if self.debug:
474 print(
475 f"[DEBUG] Stream creation attempt {attempt + 1} "
476 f"failed: {e}, retrying...",
477 file=sys.stderr,
478 )
479 await trio.sleep(retry_delay)
480 else:
481 if self.debug:
482 print(
483 f"[DEBUG] Stream creation failed after {max_retries} "
484 f"attempts: {e}",
485 file=sys.stderr,
486 )
487 raise
488 raise RuntimeError("Failed to create ping stream after retries")
489
490 async def run_dialer(self) -> None:
491 """Run the dialer role."""
492 print("Running as dialer", file=sys.stderr)
493
494 try:
495 self.validate_configuration()
496
497 if not self.destination:
498 raise ValueError("Destination address is required for dialer mode")
499
500 listener_addr = self.destination
501 print(f"Connecting to listener at: {listener_addr}", file=sys.stderr)
502
503 # Create security and muxer options
504 sec_opt, key_pair = self.create_security_options()
505 muxer_opt = self.create_muxer_options()
506
507 # WS dialer workaround: need listen addresses to register transport
508 # (py-libp2p limitation)
509 dialer_listen_addrs = (
510 self.create_listen_addresses(self.port)
511 if self.transport == "ws"
512 else None
513 )
514 if dialer_listen_addrs:
515 addrs_str = [str(addr) for addr in dialer_listen_addrs]
516 print(
517 f"Registering WS transport for dialer with addresses: {addrs_str}",
518 file=sys.stderr,
519 )
520
521 host_kwargs = {
522 "key_pair": key_pair,
523 "sec_opt": sec_opt,
524 "muxer_opt": muxer_opt,
525 "enable_quic": (self.transport == "quic-v1"),
526 }
527 if dialer_listen_addrs:
528 host_kwargs["listen_addrs"] = dialer_listen_addrs # type: ignore
529
530 self.host = new_host(**host_kwargs) # type: ignore
531
532 async with self.host.run(listen_addrs=dialer_listen_addrs or []): # type: ignore
533 handshake_start = time.time()
534 maddr = multiaddr.Multiaddr(listener_addr)
535 info = info_from_p2p_addr(maddr)
536
537 print(f"Connecting to {listener_addr}", file=sys.stderr)
538 if self.debug:
539 print(
540 f"[DEBUG] About to call host.connect() for {info.peer_id}",
541 file=sys.stderr,
542 )
543 await self.host.connect(info) # type: ignore
544 print("Connected successfully", file=sys.stderr)
545 if self.debug:
546 print(
547 "[DEBUG] host.connect() completed, checking connection state",
548 file=sys.stderr,
549 )
550
551 self._debug_connection_state(self.host.get_network(), info.peer_id) # type: ignore
552
553 # Brief delay to ensure connection is fully ready for stream creation
554 await trio.sleep(0.1)
555
556 # Retry stream creation to handle cases where connection needs more time
557 stream = await self._create_stream_with_retry(info.peer_id)
558
559 print("Performing ping test", file=sys.stderr)
560 ping_rtt = await self.send_ping(stream)
561 print(f"Ping test completed, RTT: {ping_rtt}ms", file=sys.stderr)
562
563 handshake_plus_one_rtt = (time.time() - handshake_start) * 1000
564 result = {
565 "handshakePlusOneRTTMillis": handshake_plus_one_rtt,
566 "pingRTTMilllis": ping_rtt,
567 }
568 print(f"Outputting results: {result}", file=sys.stderr)
569 print(json.dumps(result))
570
571 await stream.close()
572 print("Stream closed successfully", file=sys.stderr)
573
574 except Exception as e:
575 print(f"Dialer error: {e}", file=sys.stderr)
576 if self.debug:
577 import traceback
578
579 traceback.print_exc(file=sys.stderr)
580 sys.exit(1)
581
582 async def run(self) -> None:
583 """Main run method."""
584 try:
585 if self.is_dialer:
586 await self.run_dialer()
587 else:
588 await self.run_listener()
589
590 except Exception as e:
591 print(f"Error: {e}", file=sys.stderr)
592 if self.debug:
593 import traceback
594
595 traceback.print_exc(file=sys.stderr)
596 sys.exit(1)
597
598
599def main():
600 """Main entry point."""
601 parser = argparse.ArgumentParser(
602 description="Local libp2p ping test - standalone console script",
603 formatter_class=argparse.RawDescriptionHelpFormatter,
604 epilog="""
605Examples:
606 # Run as listener
607 python local_ping_test.py --listener --port 8000
608
609 # Run as dialer (connect to listener)
610 python local_ping_test.py --dialer --destination /ip4/127.0.0.1/tcp/8000/p2p/Qm...
611
612 # With custom transport/muxer/security
613 python local_ping_test.py --listener --transport ws --muxer yamux --security noise
614 """,
615 )
616
617 # Mode selection
618 mode_group = parser.add_mutually_exclusive_group(required=True)
619 mode_group.add_argument(
620 "--listener",
621 action="store_true",
622 help="Run as listener (wait for connection)",
623 )
624 mode_group.add_argument(
625 "--dialer", action="store_true", help="Run as dialer (connect to listener)"
626 )
627
628 # Connection options
629 parser.add_argument(
630 "-d",
631 "--destination",
632 type=str,
633 help="Destination multiaddr (required for dialer)",
634 )
635 parser.add_argument(
636 "-p", "--port", type=int, default=0, help="Port number (0 = auto-select)"
637 )
638
639 # Configuration options
640 parser.add_argument(
641 "--transport",
642 choices=["tcp", "ws", "quic-v1"],
643 default="tcp",
644 help="Transport protocol (default: tcp)",
645 )
646 parser.add_argument(
647 "--muxer",
648 choices=["mplex", "yamux"],
649 default="mplex",
650 help="Stream muxer (default: mplex)",
651 )
652 parser.add_argument(
653 "--security",
654 choices=["noise", "plaintext"],
655 default="noise",
656 help="Security protocol (default: noise)",
657 )
658
659 # Test options
660 parser.add_argument(
661 "--test-timeout",
662 type=int,
663 default=180,
664 help="Test timeout in seconds (default: 180)",
665 )
666 parser.add_argument("--debug", action="store_true", help="Enable debug logging")
667
668 args = parser.parse_args()
669
670 # Validate arguments
671 if args.dialer and not args.destination:
672 parser.error("--destination is required when running as dialer")
673
674 configure_logging(debug=args.debug)
675
676 ping_test = PingTest(
677 transport=args.transport,
678 muxer=args.muxer,
679 security=args.security,
680 port=args.port,
681 destination=args.destination,
682 test_timeout=args.test_timeout,
683 debug=args.debug,
684 )
685
686 try:
687 trio.run(ping_test.run)
688 except KeyboardInterrupt:
689 print("\nInterrupted by user", file=sys.stderr)
690 sys.exit(0)
691
692
693if __name__ == "__main__":
694 main()