Kademlia DHT Demo
This example demonstrates a Kademlia Distributed Hash Table (DHT) implementation with both value storage/retrieval and content provider advertisement/discovery functionality.
$ python -m pip install libp2p
Collecting libp2p
...
Successfully installed libp2p-x.x.x
$ cd examples/kademlia
$ python kademlia.py --mode server
2025-06-13 19:51:25,424 - kademlia-example - INFO - Running in server mode on port 0
2025-06-13 19:51:25,426 - kademlia-example - INFO - Connected to bootstrap nodes: []
2025-06-13 19:51:25,426 - kademlia-example - INFO - To connect to this node, use: --bootstrap /ip4/127.0.0.1/tcp/28910/p2p/16Uiu2HAm7EsNv5vvjPAehGAVfChjYjD63ZHyWogQRdzntSbAg9ef
2025-06-13 19:51:25,426 - kademlia-example - INFO - Saved server address to log: /ip4/127.0.0.1/tcp/28910/p2p/16Uiu2HAm7EsNv5vvjPAehGAVfChjYjD63ZHyWogQRdzntSbAg9ef
2025-06-13 19:51:25,427 - kademlia-example - INFO - DHT service started in SERVER mode
2025-06-13 19:51:25,427 - kademlia-example - INFO - Stored value 'Hello message from Sumanjeet' with key: FVDjasarSFDoLPMdgnp1dHSbW2ZAfN8NU2zNbCQeczgP
2025-06-13 19:51:25,427 - kademlia-example - INFO - Successfully advertised as server for content: 361f2ed1183bca491b8aec11f0b9e5c06724759b0f7480ae7fb4894901993bc8
Copy the line that starts with --bootstrap, open a new terminal in the same folder and run the client:
$ python kademlia.py --mode client --bootstrap /ip4/127.0.0.1/tcp/28910/p2p/16Uiu2HAm7EsNv5vvjPAehGAVfChjYjD63ZHyWogQRdzntSbAg9ef
2025-06-13 19:51:37,022 - kademlia-example - INFO - Running in client mode on port 0
2025-06-13 19:51:37,026 - kademlia-example - INFO - Connected to bootstrap nodes: [<libp2p.peer.id.ID (16Uiu2HAm7EsNv5vvjPAehGAVfChjYjD63ZHyWogQRdzntSbAg9ef)>]
2025-06-13 19:51:37,027 - kademlia-example - INFO - DHT service started in CLIENT mode
2025-06-13 19:51:37,027 - kademlia-example - INFO - Looking up key: FVDjasarSFDoLPMdgnp1dHSbW2ZAfN8NU2zNbCQeczgP
2025-06-13 19:51:37,031 - kademlia-example - INFO - Retrieved value: Hello message from Sumanjeet
2025-06-13 19:51:37,031 - kademlia-example - INFO - Looking for servers of content: 361f2ed1183bca491b8aec11f0b9e5c06724759b0f7480ae7fb4894901993bc8
2025-06-13 19:51:37,035 - kademlia-example - INFO - Found 1 servers for content: ['16Uiu2HAm7EsNv5vvjPAehGAVfChjYjD63ZHyWogQRdzntSbAg9ef']
Alternatively, if you run the server first, the client can automatically extract the bootstrap address from the server log file:
$ python kademlia.py --mode client
2025-06-13 19:51:37,022 - kademlia-example - INFO - Running in client mode on port 0
2025-06-13 19:51:37,026 - kademlia-example - INFO - Connected to bootstrap nodes: [<libp2p.peer.id.ID (16Uiu2HAm7EsNv5vvjPAehGAVfChjYjD63ZHyWogQRdzntSbAg9ef)>]
2025-06-13 19:51:37,027 - kademlia-example - INFO - DHT service started in CLIENT mode
2025-06-13 19:51:37,027 - kademlia-example - INFO - Looking up key: FVDjasarSFDoLPMdgnp1dHSbW2ZAfN8NU2zNbCQeczgP
2025-06-13 19:51:37,031 - kademlia-example - INFO - Retrieved value: Hello message from Sumanjeet
2025-06-13 19:51:37,031 - kademlia-example - INFO - Looking for servers of content: 361f2ed1183bca491b8aec11f0b9e5c06724759b0f7480ae7fb4894901993bc8
2025-06-13 19:51:37,035 - kademlia-example - INFO - Found 1 servers for content: ['16Uiu2HAm7EsNv5vvjPAehGAVfChjYjD63ZHyWogQRdzntSbAg9ef']
The demo showcases key DHT operations:
Value Storage & Retrieval: The server stores a value, and the client retrieves it
Content Provider Discovery: The server advertises content, and the client finds providers
Peer Discovery: Automatic bootstrap and peer routing using the Kademlia algorithm
Network Resilience: Distributed storage across multiple nodes (when available)
Command Line Options
The Kademlia demo supports several command line options for customization:
$ python kademlia.py --help
usage: kademlia.py [-h] [--mode MODE] [--port PORT] [--bootstrap [BOOTSTRAP ...]] [--verbose]
Kademlia DHT example with content server functionality
options:
-h, --help show this help message and exit
--mode MODE Run as a server or client node (default: server)
--port PORT Port to listen on (0 for random) (default: 0)
--bootstrap [BOOTSTRAP ...]
Multiaddrs of bootstrap nodes. Provide a space-separated list of addresses.
This is required for client mode.
--verbose Enable verbose logging
Examples:
Start server on a specific port:
$ python kademlia.py --mode server --port 8000
Start client with verbose logging:
$ python kademlia.py --mode client --verbose
Connect to multiple bootstrap nodes:
$ python kademlia.py --mode client --bootstrap /ip4/127.0.0.1/tcp/8000/p2p/... /ip4/127.0.0.1/tcp/8001/p2p/...
How It Works
The Kademlia DHT implementation demonstrates several key concepts:
- Server Mode:
Stores key-value pairs in the distributed hash table
Advertises itself as a content provider for specific content
Handles incoming DHT requests from other nodes
Maintains routing table with known peers
- Client Mode:
Connects to bootstrap nodes to join the network
Retrieves values by their keys from the DHT
Discovers content providers for specific content
Performs network lookups using the Kademlia algorithm
- Key Components:
Routing Table: Organizes peers in k-buckets based on XOR distance
Value Store: Manages key-value storage with TTL (time-to-live)
Provider Store: Tracks which peers provide specific content
Peer Routing: Implements iterative lookups to find closest peers
The full source code for this example is below:
1#!/usr/bin/env python
2
3"""
4A basic example of using the Kademlia DHT implementation, with all setup logic inlined.
5This example demonstrates both value storage/retrieval and content server
6advertisement/discovery.
7It also shows how to use custom validators for namespaced keys.
8"""
9
10import argparse
11import logging
12import os
13import random
14import secrets
15import sys
16
17from multiaddr import (
18 Multiaddr,
19)
20import trio
21
22from libp2p import (
23 new_host,
24)
25from libp2p.abc import (
26 IHost,
27)
28from libp2p.crypto.secp256k1 import (
29 create_new_key_pair,
30)
31from libp2p.kad_dht.kad_dht import (
32 DHTMode,
33 KadDHT,
34)
35from libp2p.records.validator import Validator
36from libp2p.tools.async_service import (
37 background_trio_service,
38)
39from libp2p.tools.utils import (
40 info_from_p2p_addr,
41)
42from libp2p.utils.paths import get_script_dir, join_paths
43
44
45# Custom validator for the "example" namespace
46class ExampleValidator(Validator):
47 """
48 A simple validator for the 'example' namespace.
49
50 This validator accepts any value and always selects the first value
51 when comparing multiple values.
52 """
53
54 def validate(self, key: str, value: bytes) -> None:
55 """
56 Validate a key-value pair.
57
58 In a real application, you might check:
59 - Value format/schema
60 - Signatures
61 - Size limits
62 - etc.
63 """
64 # For this example, we accept any value
65 # You can add custom validation logic here
66 if not value:
67 raise ValueError("Value cannot be empty")
68
69 def select(self, key: str, values: list[bytes]) -> int:
70 """
71 Select the best value from a list of values.
72
73 Returns the index of the selected value.
74 In this example, we simply return the first value (index 0).
75
76 In a real application, you might:
77 - Compare timestamps
78 - Check version numbers
79 - Verify signatures and pick the most recent valid one
80 """
81 return 0
82
83
84# Configure logging
85logging.basicConfig(
86 level=logging.INFO,
87 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
88 handlers=[logging.StreamHandler()],
89)
90logger = logging.getLogger("kademlia-example")
91
92# Configure DHT module loggers to inherit from the parent logger
93# This ensures all kademlia-example.* loggers use the same configuration
94# Get the directory where this script is located
95SCRIPT_DIR = get_script_dir(__file__)
96SERVER_ADDR_LOG = join_paths(SCRIPT_DIR, "server_node_addr.txt")
97
98# Set the level for all child loggers
99for module in [
100 "kad_dht",
101 "value_store",
102 "peer_routing",
103 "routing_table",
104 "provider_store",
105]:
106 child_logger = logging.getLogger(f"kademlia-example.{module}")
107 child_logger.setLevel(logging.INFO)
108 child_logger.propagate = True # Allow propagation to parent
109
110# File to store node information
111bootstrap_nodes = []
112
113
114# function to take bootstrap_nodes as input and connects to them
115async def connect_to_bootstrap_nodes(host: IHost, bootstrap_addrs: list[str]) -> None:
116 """
117 Connect to the bootstrap nodes provided in the list.
118
119 params: host: The host instance to connect to
120 bootstrap_addrs: List of bootstrap node addresses
121
122 Returns
123 -------
124 None
125
126 """
127 for addr in bootstrap_addrs:
128 try:
129 peerInfo = info_from_p2p_addr(Multiaddr(addr))
130 host.get_peerstore().add_addrs(peerInfo.peer_id, peerInfo.addrs, 3600)
131 await host.connect(peerInfo)
132 except Exception as e:
133 logger.error(f"Failed to connect to bootstrap node {addr}: {e}")
134
135
136def save_server_addr(addr: str) -> None:
137 """Append the server's multiaddress to the log file."""
138 try:
139 with open(SERVER_ADDR_LOG, "w") as f:
140 f.write(addr + "\n")
141 logger.info(f"Saved server address to log: {addr}")
142 except Exception as e:
143 logger.error(f"Failed to save server address: {e}")
144
145
146def load_server_addrs() -> list[str]:
147 """Load all server multiaddresses from the log file."""
148 if not os.path.exists(SERVER_ADDR_LOG):
149 return []
150 try:
151 with open(SERVER_ADDR_LOG) as f:
152 return [line.strip() for line in f if line.strip()]
153 except Exception as e:
154 logger.error(f"Failed to load server addresses: {e}")
155 return []
156
157
158async def run_node(
159 port: int, mode: str, bootstrap_addrs: list[str] | None = None
160) -> None:
161 """Run a node that serves content in the DHT with setup inlined."""
162 try:
163 if port <= 0:
164 port = random.randint(10000, 60000)
165 logger.debug(f"Using port: {port}")
166
167 # Convert string mode to DHTMode enum
168 if mode is None or mode.upper() == "CLIENT":
169 dht_mode = DHTMode.CLIENT
170 elif mode.upper() == "SERVER":
171 dht_mode = DHTMode.SERVER
172 else:
173 logger.error(f"Invalid mode: {mode}. Must be 'client' or 'server'")
174 sys.exit(1)
175
176 # Load server addresses for client mode
177 if dht_mode == DHTMode.CLIENT:
178 server_addrs = load_server_addrs()
179 if server_addrs:
180 logger.info(f"Loaded {len(server_addrs)} server addresses from log")
181 bootstrap_nodes.append(server_addrs[0]) # Use the first server address
182 else:
183 logger.warning("No server addresses found in log file")
184
185 if bootstrap_addrs:
186 for addr in bootstrap_addrs:
187 bootstrap_nodes.append(addr)
188
189 key_pair = create_new_key_pair(secrets.token_bytes(32))
190 host = new_host(key_pair=key_pair)
191
192 from libp2p.utils.address_validation import (
193 get_available_interfaces,
194 get_optimal_binding_address,
195 )
196
197 listen_addrs = get_available_interfaces(port)
198
199 async with host.run(listen_addrs=listen_addrs), trio.open_nursery() as nursery:
200 # Start the peer-store cleanup task
201 nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
202
203 peer_id = host.get_id().pretty()
204
205 # Get all available addresses with peer ID
206 all_addrs = host.get_addrs()
207
208 logger.info("Listener ready, listening on:")
209 for addr in all_addrs:
210 logger.info(f"{addr}")
211
212 # Use optimal address for the bootstrap command
213 optimal_addr = get_optimal_binding_address(port)
214 optimal_addr_with_peer = f"{optimal_addr}/p2p/{host.get_id().to_string()}"
215 bootstrap_cmd = f"--bootstrap {optimal_addr_with_peer}"
216 logger.info("To connect to this node, use: %s", bootstrap_cmd)
217
218 await connect_to_bootstrap_nodes(host, bootstrap_nodes)
219 dht = KadDHT(host, dht_mode)
220
221 # Register a custom validator for the "example" namespace
222 # This allows us to store values with keys like "/example/my-key"
223 dht.register_validator("example", ExampleValidator())
224 logger.info("Registered custom 'example' namespace validator")
225
226 # take all peer ids from the host and add them to the dht
227 for peer_id in host.get_peerstore().peer_ids():
228 await dht.routing_table.add_peer(peer_id)
229 logger.info(f"Connected to bootstrap nodes: {host.get_connected_peers()}")
230
231 # Save server address in server mode
232 if dht_mode == DHTMode.SERVER:
233 save_server_addr(str(optimal_addr_with_peer))
234
235 # Start the DHT service
236 async with background_trio_service(dht):
237 logger.info(f"DHT service started in {dht_mode.value} mode")
238
239 # Example 1: Simple Key-Value Storage with namespaced key
240 # Keys MUST be namespaced (e.g., /namespace/key) for validation
241 # The namespace must have a registered validator
242 key = "/example/my-example-key"
243 value = b"Hello from py-libp2p!"
244
245 # Example 2: Content Provider Advertisement
246 # Provider keys use a different storage mechanism (provider store)
247 # that doesn't go through the value validation path
248 content_id = "my-content-identifier"
249
250 if dht_mode == DHTMode.SERVER:
251 # Store key-value pair in the DHT
252 await dht.put_value(key, value)
253 logger.info(f"Stored value: {value.decode()} with key: {key}")
254
255 # Advertise as a provider for content
256 success = await dht.provide(content_id)
257 if success:
258 logger.info(f"Advertised as provider for content: {content_id}")
259 else:
260 logger.warning("Failed to advertise as provider")
261
262 else:
263 # Retrieve value from DHT using the same key
264 logger.info(f"Looking up key: {key}")
265 retrieved_value = await dht.get_value(key)
266 if retrieved_value:
267 logger.info(f"Retrieved value: {retrieved_value.decode()}")
268 else:
269 logger.warning("Failed to retrieve value")
270
271 # Find providers for content
272 logger.info(f"Looking for providers of content: {content_id}")
273 providers = await dht.find_providers(content_id)
274 if providers:
275 logger.info(
276 f"Found {len(providers)} providers: "
277 f"{[p.peer_id.pretty() for p in providers]}"
278 )
279 else:
280 logger.warning("No providers found")
281
282 # Keep the node running
283 while True:
284 logger.info(
285 "Status - Connected peers: %d,"
286 "Peers in store: %d, Values in store: %d",
287 len(dht.host.get_connected_peers()),
288 len(dht.host.get_peerstore().peer_ids()),
289 len(dht.value_store.store),
290 )
291 await trio.sleep(10)
292
293 except Exception as e:
294 logger.error(f"Server node error: {e}", exc_info=True)
295 sys.exit(1)
296
297
298def parse_args():
299 """Parse command line arguments."""
300 parser = argparse.ArgumentParser(
301 description="Kademlia DHT example with content server functionality"
302 )
303 parser.add_argument(
304 "--mode",
305 default="server",
306 help="Run as a server or client node",
307 )
308 parser.add_argument(
309 "--port",
310 type=int,
311 default=0,
312 help="Port to listen on (0 for random)",
313 )
314 parser.add_argument(
315 "--bootstrap",
316 type=str,
317 nargs="*",
318 help=(
319 "Multiaddrs of bootstrap nodes. "
320 "Provide a space-separated list of addresses. "
321 "This is required for client mode."
322 ),
323 )
324 # add option to use verbose logging
325 parser.add_argument(
326 "--verbose",
327 action="store_true",
328 help="Enable verbose logging",
329 )
330
331 args = parser.parse_args()
332 # Set logging level based on verbosity
333 if args.verbose:
334 logging.getLogger().setLevel(logging.DEBUG)
335 else:
336 logging.getLogger().setLevel(logging.INFO)
337
338 return args
339
340
341def main():
342 """Main entry point for the kademlia demo."""
343 try:
344 args = parse_args()
345 logger.info(
346 "Running in %s mode on port %d",
347 args.mode,
348 args.port,
349 )
350 trio.run(run_node, args.port, args.mode, args.bootstrap)
351 except Exception as e:
352 logger.critical(f"Script failed: {e}", exc_info=True)
353 sys.exit(1)
354
355
356if __name__ == "__main__":
357 main()