Filecoin DX Examples
Read this first:
These examples show practical Filecoin-focused workflows using
libp2p.filecoin.
Connect to a Filecoin peer
$ filecoin-connect-demo --network mainnet --resolve-dns --json
1from __future__ import annotations
2
3import argparse
4import json
5import logging
6from typing import Any
7
8import multiaddr
9import trio
10
11from libp2p import new_host
12from libp2p.filecoin import get_network_preset, get_runtime_bootstrap_addresses
13from libp2p.peer.peerinfo import info_from_p2p_addr
14from libp2p.utils.address_validation import find_free_port
15
16logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
17logger = logging.getLogger(__name__)
18
19
20def _build_result(
21 network_alias: str,
22 network_name: str,
23 attempted: int,
24 connected: bool,
25 address: str | None,
26 peer_id: str | None,
27 error: str | None,
28) -> dict[str, Any]:
29 return {
30 "network_alias": network_alias,
31 "network_name": network_name,
32 "attempted": attempted,
33 "connected": connected,
34 "address": address,
35 "peer_id": peer_id,
36 "error": error,
37 }
38
39
40async def run(
41 network: str,
42 peer: str | None,
43 resolve_dns: bool,
44 timeout: float,
45 as_json: bool,
46) -> int:
47 preset = get_network_preset(network)
48 network_name = preset.genesis_network_name
49
50 candidates = (
51 [peer]
52 if peer
53 else get_runtime_bootstrap_addresses(network, resolve_dns=resolve_dns)
54 )
55
56 if not candidates:
57 result = _build_result(
58 network_alias=network,
59 network_name=network_name,
60 attempted=0,
61 connected=False,
62 address=None,
63 peer_id=None,
64 error="no candidate peer addresses available",
65 )
66 if as_json:
67 print(json.dumps(result, indent=2, sort_keys=True))
68 else:
69 logger.error(result["error"])
70 return 1
71
72 listen_port = find_free_port()
73 listen_addrs = [multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{listen_port}")]
74 host = new_host(listen_addrs=listen_addrs)
75
76 selected_addr: str | None = None
77 selected_peer_id: str | None = None
78 last_error: str | None = None
79
80 async with host.run(listen_addrs=listen_addrs):
81 for addr in candidates:
82 try:
83 info = info_from_p2p_addr(multiaddr.Multiaddr(addr))
84 with trio.fail_after(timeout):
85 await host.connect(info)
86 selected_addr = addr
87 selected_peer_id = str(info.peer_id)
88 break
89 except Exception as exc:
90 last_error = str(exc)
91
92 connected = selected_addr is not None
93 result = _build_result(
94 network_alias=network,
95 network_name=network_name,
96 attempted=len(candidates),
97 connected=connected,
98 address=selected_addr,
99 peer_id=selected_peer_id,
100 error=None if connected else last_error,
101 )
102
103 if as_json:
104 print(json.dumps(result, indent=2, sort_keys=True))
105 else:
106 logger.info("network alias: %s", result["network_alias"])
107 logger.info("network name: %s", result["network_name"])
108 logger.info("attempted peers: %s", result["attempted"])
109 if connected:
110 logger.info("connected peer: %s", result["peer_id"])
111 logger.info("connected address: %s", result["address"])
112 else:
113 logger.error("connect failed: %s", result["error"])
114
115 return 0 if connected else 1
116
117
118def build_parser() -> argparse.ArgumentParser:
119 parser = argparse.ArgumentParser(
120 description="Connect to a Filecoin peer via py-libp2p runtime bootstrap set.",
121 )
122 parser.add_argument(
123 "--network",
124 choices=("mainnet", "calibnet"),
125 default="mainnet",
126 help="Filecoin network alias.",
127 )
128 parser.add_argument(
129 "--peer",
130 type=str,
131 default=None,
132 help="Explicit /.../p2p/<peer-id> multiaddr to dial.",
133 )
134 parser.add_argument(
135 "--resolve-dns",
136 action=argparse.BooleanOptionalAction,
137 default=True,
138 help="Resolve DNS bootstrap entries for runtime dialing.",
139 )
140 parser.add_argument(
141 "--timeout",
142 type=float,
143 default=10.0,
144 help="Per-peer dial timeout in seconds.",
145 )
146 parser.add_argument(
147 "--json",
148 action="store_true",
149 help="Print deterministic JSON output.",
150 )
151 return parser
152
153
154def main() -> None:
155 parser = build_parser()
156 args = parser.parse_args()
157 try:
158 raise SystemExit(
159 trio.run(
160 run,
161 args.network,
162 args.peer,
163 args.resolve_dns,
164 args.timeout,
165 args.json,
166 )
167 )
168 except KeyboardInterrupt:
169 logger.info("interrupted")
170 raise SystemExit(130)
171
172
173if __name__ == "__main__":
174 main()
Ping + identify a Filecoin peer
$ filecoin-ping-identify-demo --network calibnet --ping-count 3 --json
1from __future__ import annotations
2
3import argparse
4import json
5import logging
6from statistics import mean
7from typing import Any
8
9import multiaddr
10import trio
11
12from libp2p import new_host
13from libp2p.filecoin import (
14 FIL_CHAIN_EXCHANGE_PROTOCOL,
15 FIL_HELLO_PROTOCOL,
16 get_network_preset,
17 get_runtime_bootstrap_addresses,
18)
19from libp2p.host.ping import PingService
20from libp2p.identity.identify.identify import (
21 ID as IDENTIFY_PROTOCOL_ID,
22 parse_identify_response,
23)
24from libp2p.peer.peerinfo import info_from_p2p_addr
25from libp2p.utils.address_validation import find_free_port
26from libp2p.utils.varint import read_length_prefixed_protobuf
27
28logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
29logger = logging.getLogger(__name__)
30
31
32def _build_result(
33 network_alias: str,
34 network_name: str,
35 connected: bool,
36 address: str | None,
37 peer_id: str | None,
38 identify: dict[str, Any] | None,
39 ping: dict[str, Any] | None,
40 error: str | None,
41) -> dict[str, Any]:
42 return {
43 "network_alias": network_alias,
44 "network_name": network_name,
45 "connected": connected,
46 "address": address,
47 "peer_id": peer_id,
48 "identify": identify,
49 "ping": ping,
50 "error": error,
51 }
52
53
54async def _run_identify(host: Any, peer_id: Any) -> dict[str, Any]:
55 stream = await host.new_stream(peer_id, [IDENTIFY_PROTOCOL_ID])
56 raw_response = await read_length_prefixed_protobuf(stream, use_varint_format=True)
57 await stream.close()
58
59 identify_msg = parse_identify_response(raw_response)
60 protocols = list(identify_msg.protocols)
61
62 return {
63 "agent_version": identify_msg.agent_version,
64 "protocol_version": identify_msg.protocol_version,
65 "protocol_count": len(protocols),
66 "supports_filecoin_hello": str(FIL_HELLO_PROTOCOL) in protocols,
67 "supports_filecoin_chain_exchange": str(FIL_CHAIN_EXCHANGE_PROTOCOL)
68 in protocols,
69 }
70
71
72async def _run_ping(host: Any, peer_id: Any, ping_count: int) -> dict[str, Any]:
73 ping_service = PingService(host)
74 rtts = await ping_service.ping(peer_id, ping_amt=ping_count)
75 return {
76 "count": ping_count,
77 "rtts_us": rtts,
78 "avg_rtt_us": mean(rtts) if rtts else None,
79 }
80
81
82async def run(
83 network: str,
84 peer: str | None,
85 resolve_dns: bool,
86 timeout: float,
87 ping_count: int,
88 as_json: bool,
89) -> int:
90 preset = get_network_preset(network)
91 network_name = preset.genesis_network_name
92
93 candidates = (
94 [peer]
95 if peer
96 else get_runtime_bootstrap_addresses(network, resolve_dns=resolve_dns)
97 )
98
99 if not candidates:
100 result = _build_result(
101 network_alias=network,
102 network_name=network_name,
103 connected=False,
104 address=None,
105 peer_id=None,
106 identify=None,
107 ping=None,
108 error="no candidate peer addresses available",
109 )
110 if as_json:
111 print(json.dumps(result, indent=2, sort_keys=True))
112 else:
113 logger.error(result["error"])
114 return 1
115
116 listen_port = find_free_port()
117 listen_addrs = [multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{listen_port}")]
118 host = new_host(listen_addrs=listen_addrs)
119
120 selected_addr: str | None = None
121 selected_info = None
122 last_error: str | None = None
123 identify_payload: dict[str, Any] | None = None
124 ping_payload: dict[str, Any] | None = None
125
126 async with host.run(listen_addrs=listen_addrs):
127 for addr in candidates:
128 try:
129 info = info_from_p2p_addr(multiaddr.Multiaddr(addr))
130 with trio.fail_after(timeout):
131 await host.connect(info)
132 selected_addr = addr
133 selected_info = info
134 break
135 except Exception as exc:
136 last_error = str(exc)
137
138 if selected_info is not None:
139 try:
140 identify_payload = await _run_identify(host, selected_info.peer_id)
141 ping_payload = await _run_ping(host, selected_info.peer_id, ping_count)
142 except Exception as exc:
143 last_error = str(exc)
144
145 connected = selected_addr is not None
146 result = _build_result(
147 network_alias=network,
148 network_name=network_name,
149 connected=(
150 connected and identify_payload is not None and ping_payload is not None
151 ),
152 address=selected_addr,
153 peer_id=str(selected_info.peer_id) if selected_info else None,
154 identify=identify_payload,
155 ping=ping_payload,
156 error=last_error,
157 )
158
159 if as_json:
160 print(json.dumps(result, indent=2, sort_keys=True))
161 else:
162 logger.info("network alias: %s", result["network_alias"])
163 logger.info("network name: %s", result["network_name"])
164 logger.info("peer: %s", result["peer_id"])
165 logger.info("address: %s", result["address"])
166 if result["identify"] is not None:
167 logger.info("agent version: %s", result["identify"]["agent_version"])
168 logger.info(
169 "supports /fil/hello/1.0.0: %s",
170 result["identify"]["supports_filecoin_hello"],
171 )
172 logger.info(
173 "supports /fil/chain/xchg/0.0.1: %s",
174 result["identify"]["supports_filecoin_chain_exchange"],
175 )
176 if result["ping"] is not None:
177 logger.info("ping avg RTT (us): %s", result["ping"]["avg_rtt_us"])
178 if result["error"] is not None:
179 logger.error("diagnostic error: %s", result["error"])
180
181 return 0 if result["connected"] else 1
182
183
184def build_parser() -> argparse.ArgumentParser:
185 parser = argparse.ArgumentParser(
186 description="Dial a Filecoin peer and run identify + ping diagnostics.",
187 )
188 parser.add_argument(
189 "--network",
190 choices=("mainnet", "calibnet"),
191 default="mainnet",
192 help="Filecoin network alias.",
193 )
194 parser.add_argument(
195 "--peer",
196 type=str,
197 default=None,
198 help="Explicit /.../p2p/<peer-id> multiaddr to dial.",
199 )
200 parser.add_argument(
201 "--resolve-dns",
202 action=argparse.BooleanOptionalAction,
203 default=True,
204 help="Resolve DNS bootstrap entries for runtime dialing.",
205 )
206 parser.add_argument(
207 "--timeout",
208 type=float,
209 default=10.0,
210 help="Per-peer dial timeout in seconds.",
211 )
212 parser.add_argument(
213 "--ping-count",
214 type=int,
215 default=3,
216 help="Number of ping probes to run after dialing.",
217 )
218 parser.add_argument(
219 "--json",
220 action="store_true",
221 help="Print deterministic JSON output.",
222 )
223 return parser
224
225
226def main() -> None:
227 parser = build_parser()
228 args = parser.parse_args()
229 try:
230 raise SystemExit(
231 trio.run(
232 run,
233 args.network,
234 args.peer,
235 args.resolve_dns,
236 args.timeout,
237 args.ping_count,
238 args.json,
239 )
240 )
241 except KeyboardInterrupt:
242 logger.info("interrupted")
243 raise SystemExit(130)
244
245
246if __name__ == "__main__":
247 main()
Read-only pubsub observer
This observer does not publish messages. It subscribes to Filecoin gossip topics and reports inbound metadata.
$ filecoin-pubsub-demo --network mainnet --topic both --seconds 20
$ filecoin-pubsub-demo --network calibnet --topic blocks --max-messages 25 --json
1from __future__ import annotations
2
3import argparse
4from collections.abc import Sequence
5from dataclasses import dataclass
6from datetime import datetime, timezone
7import json
8import logging
9from typing import Any
10
11import multiaddr
12import trio
13
14from libp2p import new_host
15from libp2p.filecoin import (
16 FIL_CHAIN_EXCHANGE_PROTOCOL,
17 FIL_HELLO_PROTOCOL,
18 blocks_topic,
19 build_filecoin_gossipsub,
20 build_filecoin_pubsub,
21 dht_protocol_name,
22 get_network_preset,
23 get_runtime_bootstrap_addresses,
24 messages_topic,
25)
26from libp2p.peer.id import ID
27from libp2p.peer.peerinfo import info_from_p2p_addr
28from libp2p.tools.anyio_service import background_trio_service
29from libp2p.utils.address_validation import find_free_port
30
31logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
32logger = logging.getLogger(__name__)
33
34
35@dataclass
36class ObserverState:
37 stop_event: trio.Event
38 message_count: int
39 max_messages: int | None
40 lock: trio.Lock
41
42 async def on_message(self) -> int:
43 async with self.lock:
44 self.message_count += 1
45 if (
46 self.max_messages is not None
47 and self.max_messages > 0
48 and self.message_count >= self.max_messages
49 ):
50 self.stop_event.set()
51 return self.message_count
52
53
54def _selected_topics(topic_mode: str, network_name: str) -> list[str]:
55 if topic_mode == "blocks":
56 return [blocks_topic(network_name)]
57 if topic_mode == "messages":
58 return [messages_topic(network_name)]
59 return [blocks_topic(network_name), messages_topic(network_name)]
60
61
62def _build_snapshot(
63 network_alias: str,
64 network_name: str,
65 bootstrap_addrs: Sequence[str],
66 listen_port: int,
67 topics: Sequence[str],
68 max_messages: int | None,
69) -> dict[str, Any]:
70 return {
71 "network_alias": network_alias,
72 "network_name": network_name,
73 "protocols": {
74 "hello": str(FIL_HELLO_PROTOCOL),
75 "chain_exchange": str(FIL_CHAIN_EXCHANGE_PROTOCOL),
76 "dht": str(dht_protocol_name(network_name)),
77 },
78 "topics": {
79 "blocks": blocks_topic(network_name),
80 "messages": messages_topic(network_name),
81 "selected": list(topics),
82 },
83 "mode": "read_only_observer",
84 "bootstrap_count": len(bootstrap_addrs),
85 "bootstrap_addresses": list(bootstrap_addrs),
86 "listen_addr": f"/ip4/0.0.0.0/tcp/{listen_port}",
87 "max_messages": max_messages,
88 }
89
90
91async def _observe_topic(
92 topic: str,
93 subscription: Any,
94 state: ObserverState,
95) -> None:
96 while not state.stop_event.is_set():
97 try:
98 message = await subscription.get()
99 except Exception as exc:
100 logger.warning("subscription error on %s: %s", topic, exc)
101 return
102
103 source = "unknown"
104 if message.from_id:
105 source = ID(message.from_id).to_base58()
106
107 ordinal = await state.on_message()
108 payload_size = len(message.data) if message.data is not None else 0
109 observed_at = datetime.now(timezone.utc).isoformat()
110 logger.info(
111 "observed message #%d topic=%s source=%s payload_bytes=%d ts=%s",
112 ordinal,
113 topic,
114 source,
115 payload_size,
116 observed_at,
117 )
118
119
120async def _connect_bootstrap_peers(
121 host: Any,
122 addrs: Sequence[str],
123 timeout: float = 8.0,
124 max_success: int = 3,
125) -> int:
126 connected = 0
127 for addr in addrs:
128 if connected >= max_success:
129 break
130 try:
131 info = info_from_p2p_addr(multiaddr.Multiaddr(addr))
132 except Exception as exc:
133 logger.debug("invalid bootstrap address %s: %s", addr, exc)
134 continue
135
136 try:
137 with trio.move_on_after(timeout) as scope:
138 await host.connect(info)
139 if scope.cancelled_caught:
140 logger.debug("timeout connecting bootstrap peer %s", info.peer_id)
141 continue
142 connected += 1
143 logger.info("connected bootstrap peer: %s", info.peer_id)
144 except Exception as exc:
145 logger.debug("failed bootstrap connect %s: %s", info.peer_id, exc)
146 return connected
147
148
149async def run(
150 network: str,
151 resolve_dns: bool,
152 include_quic: bool,
153 run_seconds: float,
154 max_messages: int | None,
155 topic_mode: str,
156 as_json: bool,
157) -> int:
158 preset = get_network_preset(network)
159 network_name = preset.genesis_network_name
160 topics = _selected_topics(topic_mode, network_name)
161
162 runtime_bootstrap = get_runtime_bootstrap_addresses(
163 network,
164 resolve_dns=resolve_dns,
165 include_quic=include_quic,
166 )
167
168 listen_port = find_free_port()
169 listen_addrs = [multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{listen_port}")]
170 # Delay bootstrap dials until pubsub services are running to avoid races
171 # where inbound streams are handled before Pubsub manager initialization.
172 host = new_host(listen_addrs=listen_addrs)
173
174 gossipsub = build_filecoin_gossipsub(network_name=network_name)
175 pubsub = build_filecoin_pubsub(
176 host=host,
177 gossipsub=gossipsub,
178 network_name=network_name,
179 )
180
181 snapshot = _build_snapshot(
182 network_alias=network,
183 network_name=network_name,
184 bootstrap_addrs=runtime_bootstrap,
185 listen_port=listen_port,
186 topics=topics,
187 max_messages=max_messages,
188 )
189
190 if as_json:
191 print(json.dumps(snapshot, indent=2, sort_keys=True))
192 else:
193 logger.info("network alias: %s", snapshot["network_alias"])
194 logger.info("network name: %s", snapshot["network_name"])
195 logger.info("hello protocol: %s", snapshot["protocols"]["hello"])
196 logger.info(
197 "chain exchange protocol: %s",
198 snapshot["protocols"]["chain_exchange"],
199 )
200 logger.info("dht protocol: %s", snapshot["protocols"]["dht"])
201 logger.info("selected topics: %s", ", ".join(topics))
202 logger.info("runtime bootstrap peers: %d", snapshot["bootstrap_count"])
203 logger.info("observer mode: read-only (publishing disabled)")
204 for addr in runtime_bootstrap[:5]:
205 logger.info("bootstrap: %s", addr)
206
207 state = ObserverState(
208 stop_event=trio.Event(),
209 message_count=0,
210 max_messages=max_messages,
211 lock=trio.Lock(),
212 )
213
214 async with host.run(listen_addrs=listen_addrs):
215 async with background_trio_service(pubsub):
216 async with background_trio_service(gossipsub):
217 await pubsub.wait_until_ready()
218 connected_bootstrap = await _connect_bootstrap_peers(
219 host,
220 runtime_bootstrap,
221 timeout=8.0,
222 max_success=3,
223 )
224 logger.info("connected bootstrap peers: %d", connected_bootstrap)
225
226 subscriptions = {}
227 for topic in topics:
228 subscriptions[topic] = await pubsub.subscribe(topic)
229 logger.info("subscribed to %s", topic)
230
231 async with trio.open_nursery() as nursery:
232 for topic, subscription in subscriptions.items():
233 nursery.start_soon(_observe_topic, topic, subscription, state)
234
235 if run_seconds > 0:
236 with trio.move_on_after(run_seconds):
237 await state.stop_event.wait()
238 state.stop_event.set()
239 else:
240 await state.stop_event.wait()
241 nursery.cancel_scope.cancel()
242
243 return 0
244
245
246def build_parser() -> argparse.ArgumentParser:
247 parser = argparse.ArgumentParser(
248 description="Filecoin pubsub read-only observer using libp2p.filecoin presets.",
249 )
250 parser.add_argument(
251 "--network",
252 choices=("mainnet", "calibnet"),
253 default="mainnet",
254 help="Filecoin network alias.",
255 )
256 parser.add_argument(
257 "--resolve-dns",
258 action=argparse.BooleanOptionalAction,
259 default=True,
260 help="Resolve dns bootstrap entries to /ip4/.../tcp/... addresses.",
261 )
262 parser.add_argument(
263 "--include-quic",
264 action="store_true",
265 help="Include QUIC and WebTransport bootstrap entries.",
266 )
267 parser.add_argument(
268 "--seconds",
269 type=float,
270 default=20.0,
271 help="How long to keep the observer running. Use 0 for indefinite run.",
272 )
273 parser.add_argument(
274 "--max-messages",
275 type=int,
276 default=None,
277 help="Stop after this many observed messages across selected topics.",
278 )
279 parser.add_argument(
280 "--topic",
281 choices=("blocks", "messages", "both"),
282 default="both",
283 help="Which Filecoin gossip topics to observe.",
284 )
285 parser.add_argument(
286 "--json",
287 action="store_true",
288 help="Print configuration snapshot as JSON before starting.",
289 )
290 return parser
291
292
293def main() -> None:
294 parser = build_parser()
295 args = parser.parse_args()
296
297 try:
298 raise SystemExit(
299 trio.run(
300 run,
301 args.network,
302 args.resolve_dns,
303 args.include_quic,
304 args.seconds,
305 args.max_messages,
306 args.topic,
307 args.json,
308 )
309 )
310 except KeyboardInterrupt:
311 logger.info("interrupted")
312 raise SystemExit(130)
313
314
315if __name__ == "__main__":
316 main()
CLI helpers
$ filecoin-dx topics --network mainnet --json
$ filecoin-dx bootstrap --network mainnet --runtime --resolve-dns --json
$ python -m libp2p.filecoin preset --network calibnet --json