Kademlia DHT Demo

This example demonstrates a Kademlia Distributed Hash Table (DHT) implementation with both value storage/retrieval and content provider advertisement/discovery functionality.

$ python -m pip install libp2p
Collecting libp2p
...
Successfully installed libp2p-x.x.x
$ cd examples/kademlia
$ python kademlia.py --mode server
2025-06-13 19:51:25,424 - kademlia-example - INFO - Running in server mode on port 0
2025-06-13 19:51:25,426 - kademlia-example - INFO - Connected to bootstrap nodes: []
2025-06-13 19:51:25,426 - kademlia-example - INFO - To connect to this node, use: --bootstrap /ip4/127.0.0.1/tcp/28910/p2p/16Uiu2HAm7EsNv5vvjPAehGAVfChjYjD63ZHyWogQRdzntSbAg9ef
2025-06-13 19:51:25,426 - kademlia-example - INFO - Saved server address to log: /ip4/127.0.0.1/tcp/28910/p2p/16Uiu2HAm7EsNv5vvjPAehGAVfChjYjD63ZHyWogQRdzntSbAg9ef
2025-06-13 19:51:25,427 - kademlia-example - INFO - DHT service started in SERVER mode
2025-06-13 19:51:25,427 - kademlia-example - INFO - Stored value 'Hello message from Sumanjeet' with key: FVDjasarSFDoLPMdgnp1dHSbW2ZAfN8NU2zNbCQeczgP
2025-06-13 19:51:25,427 - kademlia-example - INFO - Successfully advertised as server for content: 361f2ed1183bca491b8aec11f0b9e5c06724759b0f7480ae7fb4894901993bc8

Copy the line that starts with --bootstrap, open a new terminal in the same folder and run the client:

$ python kademlia.py --mode client --bootstrap /ip4/127.0.0.1/tcp/28910/p2p/16Uiu2HAm7EsNv5vvjPAehGAVfChjYjD63ZHyWogQRdzntSbAg9ef
2025-06-13 19:51:37,022 - kademlia-example - INFO - Running in client mode on port 0
2025-06-13 19:51:37,026 - kademlia-example - INFO - Connected to bootstrap nodes: [<libp2p.peer.id.ID (16Uiu2HAm7EsNv5vvjPAehGAVfChjYjD63ZHyWogQRdzntSbAg9ef)>]
2025-06-13 19:51:37,027 - kademlia-example - INFO - DHT service started in CLIENT mode
2025-06-13 19:51:37,027 - kademlia-example - INFO - Looking up key: FVDjasarSFDoLPMdgnp1dHSbW2ZAfN8NU2zNbCQeczgP
2025-06-13 19:51:37,031 - kademlia-example - INFO - Retrieved value: Hello message from Sumanjeet
2025-06-13 19:51:37,031 - kademlia-example - INFO - Looking for servers of content: 361f2ed1183bca491b8aec11f0b9e5c06724759b0f7480ae7fb4894901993bc8
2025-06-13 19:51:37,035 - kademlia-example - INFO - Found 1 servers for content: ['16Uiu2HAm7EsNv5vvjPAehGAVfChjYjD63ZHyWogQRdzntSbAg9ef']

Alternatively, if you run the server first, the client can automatically extract the bootstrap address from the server log file:

$ python kademlia.py --mode client
2025-06-13 19:51:37,022 - kademlia-example - INFO - Running in client mode on port 0
2025-06-13 19:51:37,026 - kademlia-example - INFO - Connected to bootstrap nodes: [<libp2p.peer.id.ID (16Uiu2HAm7EsNv5vvjPAehGAVfChjYjD63ZHyWogQRdzntSbAg9ef)>]
2025-06-13 19:51:37,027 - kademlia-example - INFO - DHT service started in CLIENT mode
2025-06-13 19:51:37,027 - kademlia-example - INFO - Looking up key: FVDjasarSFDoLPMdgnp1dHSbW2ZAfN8NU2zNbCQeczgP
2025-06-13 19:51:37,031 - kademlia-example - INFO - Retrieved value: Hello message from Sumanjeet
2025-06-13 19:51:37,031 - kademlia-example - INFO - Looking for servers of content: 361f2ed1183bca491b8aec11f0b9e5c06724759b0f7480ae7fb4894901993bc8
2025-06-13 19:51:37,035 - kademlia-example - INFO - Found 1 servers for content: ['16Uiu2HAm7EsNv5vvjPAehGAVfChjYjD63ZHyWogQRdzntSbAg9ef']

The demo showcases key DHT operations:

  • Value Storage & Retrieval: The server stores a value, and the client retrieves it

  • Content Provider Discovery: The server advertises content, and the client finds providers

  • Peer Discovery: Automatic bootstrap and peer routing using the Kademlia algorithm

  • Network Resilience: Distributed storage across multiple nodes (when available)

Command Line Options

The Kademlia demo supports several command line options for customization:

$ python kademlia.py --help
usage: kademlia.py [-h] [--mode MODE] [--port PORT] [--bootstrap [BOOTSTRAP ...]] [--verbose]

Kademlia DHT example with content server functionality

options:
  -h, --help            show this help message and exit
  --mode MODE           Run as a server or client node (default: server)
  --port PORT           Port to listen on (0 for random) (default: 0)
  --bootstrap [BOOTSTRAP ...]
                        Multiaddrs of bootstrap nodes. Provide a space-separated list of addresses.
                        This is required for client mode.
  --verbose             Enable verbose logging

Examples:

Start server on a specific port:

$ python kademlia.py --mode server --port 8000

Start client with verbose logging:

$ python kademlia.py --mode client --verbose

Connect to multiple bootstrap nodes:

$ python kademlia.py --mode client --bootstrap /ip4/127.0.0.1/tcp/8000/p2p/... /ip4/127.0.0.1/tcp/8001/p2p/...

How It Works

The Kademlia DHT implementation demonstrates several key concepts:

Server Mode:
  • Stores key-value pairs in the distributed hash table

  • Advertises itself as a content provider for specific content

  • Handles incoming DHT requests from other nodes

  • Maintains routing table with known peers

Client Mode:
  • Connects to bootstrap nodes to join the network

  • Retrieves values by their keys from the DHT

  • Discovers content providers for specific content

  • Performs network lookups using the Kademlia algorithm

Key Components:
  • Routing Table: Organizes peers in k-buckets based on XOR distance

  • Value Store: Manages key-value storage with TTL (time-to-live)

  • Provider Store: Tracks which peers provide specific content

  • Peer Routing: Implements iterative lookups to find closest peers

The full source code for this example is below:

  1#!/usr/bin/env python
  2
  3"""
  4A basic example of using the Kademlia DHT implementation, with all setup logic inlined.
  5This example demonstrates both value storage/retrieval and content server
  6advertisement/discovery.
  7It also shows how to use custom validators for namespaced keys.
  8"""
  9
 10import argparse
 11import logging
 12import os
 13import random
 14import secrets
 15import sys
 16
 17from multiaddr import (
 18    Multiaddr,
 19)
 20import trio
 21
 22from libp2p import (
 23    new_host,
 24)
 25from libp2p.abc import (
 26    IHost,
 27)
 28from libp2p.crypto.secp256k1 import (
 29    create_new_key_pair,
 30)
 31from libp2p.kad_dht.kad_dht import (
 32    DHTMode,
 33    KadDHT,
 34)
 35from libp2p.records.validator import Validator
 36from libp2p.tools.async_service import (
 37    background_trio_service,
 38)
 39from libp2p.tools.utils import (
 40    info_from_p2p_addr,
 41)
 42from libp2p.utils.paths import get_script_dir, join_paths
 43
 44
 45# Custom validator for the "example" namespace
 46class ExampleValidator(Validator):
 47    """
 48    A simple validator for the 'example' namespace.
 49
 50    This validator accepts any value and always selects the first value
 51    when comparing multiple values.
 52    """
 53
 54    def validate(self, key: str, value: bytes) -> None:
 55        """
 56        Validate a key-value pair.
 57
 58        In a real application, you might check:
 59        - Value format/schema
 60        - Signatures
 61        - Size limits
 62        - etc.
 63        """
 64        # For this example, we accept any value
 65        # You can add custom validation logic here
 66        if not value:
 67            raise ValueError("Value cannot be empty")
 68
 69    def select(self, key: str, values: list[bytes]) -> int:
 70        """
 71        Select the best value from a list of values.
 72
 73        Returns the index of the selected value.
 74        In this example, we simply return the first value (index 0).
 75
 76        In a real application, you might:
 77        - Compare timestamps
 78        - Check version numbers
 79        - Verify signatures and pick the most recent valid one
 80        """
 81        return 0
 82
 83
 84# Configure logging
 85logging.basicConfig(
 86    level=logging.INFO,
 87    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
 88    handlers=[logging.StreamHandler()],
 89)
 90logger = logging.getLogger("kademlia-example")
 91
 92# Configure DHT module loggers to inherit from the parent logger
 93# This ensures all kademlia-example.* loggers use the same configuration
 94# Get the directory where this script is located
 95SCRIPT_DIR = get_script_dir(__file__)
 96SERVER_ADDR_LOG = join_paths(SCRIPT_DIR, "server_node_addr.txt")
 97
 98# Set the level for all child loggers
 99for module in [
100    "kad_dht",
101    "value_store",
102    "peer_routing",
103    "routing_table",
104    "provider_store",
105]:
106    child_logger = logging.getLogger(f"kademlia-example.{module}")
107    child_logger.setLevel(logging.INFO)
108    child_logger.propagate = True  # Allow propagation to parent
109
110# File to store node information
111bootstrap_nodes = []
112
113
114# function to take bootstrap_nodes as input and connects to them
115async def connect_to_bootstrap_nodes(host: IHost, bootstrap_addrs: list[str]) -> None:
116    """
117    Connect to the bootstrap nodes provided in the list.
118
119    params: host: The host instance to connect to
120            bootstrap_addrs: List of bootstrap node addresses
121
122    Returns
123    -------
124        None
125
126    """
127    for addr in bootstrap_addrs:
128        try:
129            peerInfo = info_from_p2p_addr(Multiaddr(addr))
130            host.get_peerstore().add_addrs(peerInfo.peer_id, peerInfo.addrs, 3600)
131            await host.connect(peerInfo)
132        except Exception as e:
133            logger.error(f"Failed to connect to bootstrap node {addr}: {e}")
134
135
136def save_server_addr(addr: str) -> None:
137    """Append the server's multiaddress to the log file."""
138    try:
139        with open(SERVER_ADDR_LOG, "w") as f:
140            f.write(addr + "\n")
141        logger.info(f"Saved server address to log: {addr}")
142    except Exception as e:
143        logger.error(f"Failed to save server address: {e}")
144
145
146def load_server_addrs() -> list[str]:
147    """Load all server multiaddresses from the log file."""
148    if not os.path.exists(SERVER_ADDR_LOG):
149        return []
150    try:
151        with open(SERVER_ADDR_LOG) as f:
152            return [line.strip() for line in f if line.strip()]
153    except Exception as e:
154        logger.error(f"Failed to load server addresses: {e}")
155        return []
156
157
158async def run_node(
159    port: int, mode: str, bootstrap_addrs: list[str] | None = None
160) -> None:
161    """Run a node that serves content in the DHT with setup inlined."""
162    try:
163        if port <= 0:
164            port = random.randint(10000, 60000)
165        logger.debug(f"Using port: {port}")
166
167        # Convert string mode to DHTMode enum
168        if mode is None or mode.upper() == "CLIENT":
169            dht_mode = DHTMode.CLIENT
170        elif mode.upper() == "SERVER":
171            dht_mode = DHTMode.SERVER
172        else:
173            logger.error(f"Invalid mode: {mode}. Must be 'client' or 'server'")
174            sys.exit(1)
175
176        # Load server addresses for client mode
177        if dht_mode == DHTMode.CLIENT:
178            server_addrs = load_server_addrs()
179            if server_addrs:
180                logger.info(f"Loaded {len(server_addrs)} server addresses from log")
181                bootstrap_nodes.append(server_addrs[0])  # Use the first server address
182            else:
183                logger.warning("No server addresses found in log file")
184
185        if bootstrap_addrs:
186            for addr in bootstrap_addrs:
187                bootstrap_nodes.append(addr)
188
189        key_pair = create_new_key_pair(secrets.token_bytes(32))
190        host = new_host(key_pair=key_pair)
191
192        from libp2p.utils.address_validation import (
193            get_available_interfaces,
194            get_optimal_binding_address,
195        )
196
197        listen_addrs = get_available_interfaces(port)
198
199        async with host.run(listen_addrs=listen_addrs), trio.open_nursery() as nursery:
200            # Start the peer-store cleanup task
201            nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
202
203            peer_id = host.get_id().pretty()
204
205            # Get all available addresses with peer ID
206            all_addrs = host.get_addrs()
207
208            logger.info("Listener ready, listening on:")
209            for addr in all_addrs:
210                logger.info(f"{addr}")
211
212            # Use optimal address for the bootstrap command
213            optimal_addr = get_optimal_binding_address(port)
214            optimal_addr_with_peer = f"{optimal_addr}/p2p/{host.get_id().to_string()}"
215            bootstrap_cmd = f"--bootstrap {optimal_addr_with_peer}"
216            logger.info("To connect to this node, use: %s", bootstrap_cmd)
217
218            await connect_to_bootstrap_nodes(host, bootstrap_nodes)
219            dht = KadDHT(host, dht_mode)
220
221            # Register a custom validator for the "example" namespace
222            # This allows us to store values with keys like "/example/my-key"
223            dht.register_validator("example", ExampleValidator())
224            logger.info("Registered custom 'example' namespace validator")
225
226            # take all peer ids from the host and add them to the dht
227            for peer_id in host.get_peerstore().peer_ids():
228                await dht.routing_table.add_peer(peer_id)
229            logger.info(f"Connected to bootstrap nodes: {host.get_connected_peers()}")
230
231            # Save server address in server mode
232            if dht_mode == DHTMode.SERVER:
233                save_server_addr(str(optimal_addr_with_peer))
234
235            # Start the DHT service
236            async with background_trio_service(dht):
237                logger.info(f"DHT service started in {dht_mode.value} mode")
238
239                # Example 1: Simple Key-Value Storage with namespaced key
240                # Keys MUST be namespaced (e.g., /namespace/key) for validation
241                # The namespace must have a registered validator
242                key = "/example/my-example-key"
243                value = b"Hello from py-libp2p!"
244
245                # Example 2: Content Provider Advertisement
246                # Provider keys use a different storage mechanism (provider store)
247                # that doesn't go through the value validation path
248                content_id = "my-content-identifier"
249
250                if dht_mode == DHTMode.SERVER:
251                    # Store key-value pair in the DHT
252                    await dht.put_value(key, value)
253                    logger.info(f"Stored value: {value.decode()} with key: {key}")
254
255                    # Advertise as a provider for content
256                    success = await dht.provide(content_id)
257                    if success:
258                        logger.info(f"Advertised as provider for content: {content_id}")
259                    else:
260                        logger.warning("Failed to advertise as provider")
261
262                else:
263                    # Retrieve value from DHT using the same key
264                    logger.info(f"Looking up key: {key}")
265                    retrieved_value = await dht.get_value(key)
266                    if retrieved_value:
267                        logger.info(f"Retrieved value: {retrieved_value.decode()}")
268                    else:
269                        logger.warning("Failed to retrieve value")
270
271                    # Find providers for content
272                    logger.info(f"Looking for providers of content: {content_id}")
273                    providers = await dht.find_providers(content_id)
274                    if providers:
275                        logger.info(
276                            f"Found {len(providers)} providers: "
277                            f"{[p.peer_id.pretty() for p in providers]}"
278                        )
279                    else:
280                        logger.warning("No providers found")
281
282                # Keep the node running
283                while True:
284                    logger.info(
285                        "Status - Connected peers: %d,"
286                        "Peers in store: %d, Values in store: %d",
287                        len(dht.host.get_connected_peers()),
288                        len(dht.host.get_peerstore().peer_ids()),
289                        len(dht.value_store.store),
290                    )
291                    await trio.sleep(10)
292
293    except Exception as e:
294        logger.error(f"Server node error: {e}", exc_info=True)
295        sys.exit(1)
296
297
298def parse_args():
299    """Parse command line arguments."""
300    parser = argparse.ArgumentParser(
301        description="Kademlia DHT example with content server functionality"
302    )
303    parser.add_argument(
304        "--mode",
305        default="server",
306        help="Run as a server or client node",
307    )
308    parser.add_argument(
309        "--port",
310        type=int,
311        default=0,
312        help="Port to listen on (0 for random)",
313    )
314    parser.add_argument(
315        "--bootstrap",
316        type=str,
317        nargs="*",
318        help=(
319            "Multiaddrs of bootstrap nodes. "
320            "Provide a space-separated list of addresses. "
321            "This is required for client mode."
322        ),
323    )
324    # add option to use verbose logging
325    parser.add_argument(
326        "--verbose",
327        action="store_true",
328        help="Enable verbose logging",
329    )
330
331    args = parser.parse_args()
332    # Set logging level based on verbosity
333    if args.verbose:
334        logging.getLogger().setLevel(logging.DEBUG)
335    else:
336        logging.getLogger().setLevel(logging.INFO)
337
338    return args
339
340
341def main():
342    """Main entry point for the kademlia demo."""
343    try:
344        args = parse_args()
345        logger.info(
346            "Running in %s mode on port %d",
347            args.mode,
348            args.port,
349        )
350        trio.run(run_node, args.port, args.mode, args.bootstrap)
351    except Exception as e:
352        logger.critical(f"Script failed: {e}", exc_info=True)
353        sys.exit(1)
354
355
356if __name__ == "__main__":
357    main()