Filecoin DX Examples

Read this first:

These examples show practical Filecoin-focused workflows using libp2p.filecoin.

Connect to a Filecoin peer

$ filecoin-connect-demo --network mainnet --resolve-dns --json
  1from __future__ import annotations
  2
  3import argparse
  4import json
  5import logging
  6from typing import Any
  7
  8import multiaddr
  9import trio
 10
 11from libp2p import new_host
 12from libp2p.filecoin import get_network_preset, get_runtime_bootstrap_addresses
 13from libp2p.peer.peerinfo import info_from_p2p_addr
 14from libp2p.utils.address_validation import find_free_port
 15
 16logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
 17logger = logging.getLogger(__name__)
 18
 19
 20def _build_result(
 21    network_alias: str,
 22    network_name: str,
 23    attempted: int,
 24    connected: bool,
 25    address: str | None,
 26    peer_id: str | None,
 27    error: str | None,
 28) -> dict[str, Any]:
 29    return {
 30        "network_alias": network_alias,
 31        "network_name": network_name,
 32        "attempted": attempted,
 33        "connected": connected,
 34        "address": address,
 35        "peer_id": peer_id,
 36        "error": error,
 37    }
 38
 39
 40async def run(
 41    network: str,
 42    peer: str | None,
 43    resolve_dns: bool,
 44    timeout: float,
 45    as_json: bool,
 46) -> int:
 47    preset = get_network_preset(network)
 48    network_name = preset.genesis_network_name
 49
 50    candidates = (
 51        [peer]
 52        if peer
 53        else get_runtime_bootstrap_addresses(network, resolve_dns=resolve_dns)
 54    )
 55
 56    if not candidates:
 57        result = _build_result(
 58            network_alias=network,
 59            network_name=network_name,
 60            attempted=0,
 61            connected=False,
 62            address=None,
 63            peer_id=None,
 64            error="no candidate peer addresses available",
 65        )
 66        if as_json:
 67            print(json.dumps(result, indent=2, sort_keys=True))
 68        else:
 69            logger.error(result["error"])
 70        return 1
 71
 72    listen_port = find_free_port()
 73    listen_addrs = [multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{listen_port}")]
 74    host = new_host(listen_addrs=listen_addrs)
 75
 76    selected_addr: str | None = None
 77    selected_peer_id: str | None = None
 78    last_error: str | None = None
 79
 80    async with host.run(listen_addrs=listen_addrs):
 81        for addr in candidates:
 82            try:
 83                info = info_from_p2p_addr(multiaddr.Multiaddr(addr))
 84                with trio.fail_after(timeout):
 85                    await host.connect(info)
 86                selected_addr = addr
 87                selected_peer_id = str(info.peer_id)
 88                break
 89            except Exception as exc:
 90                last_error = str(exc)
 91
 92    connected = selected_addr is not None
 93    result = _build_result(
 94        network_alias=network,
 95        network_name=network_name,
 96        attempted=len(candidates),
 97        connected=connected,
 98        address=selected_addr,
 99        peer_id=selected_peer_id,
100        error=None if connected else last_error,
101    )
102
103    if as_json:
104        print(json.dumps(result, indent=2, sort_keys=True))
105    else:
106        logger.info("network alias: %s", result["network_alias"])
107        logger.info("network name: %s", result["network_name"])
108        logger.info("attempted peers: %s", result["attempted"])
109        if connected:
110            logger.info("connected peer: %s", result["peer_id"])
111            logger.info("connected address: %s", result["address"])
112        else:
113            logger.error("connect failed: %s", result["error"])
114
115    return 0 if connected else 1
116
117
118def build_parser() -> argparse.ArgumentParser:
119    parser = argparse.ArgumentParser(
120        description="Connect to a Filecoin peer via py-libp2p runtime bootstrap set.",
121    )
122    parser.add_argument(
123        "--network",
124        choices=("mainnet", "calibnet"),
125        default="mainnet",
126        help="Filecoin network alias.",
127    )
128    parser.add_argument(
129        "--peer",
130        type=str,
131        default=None,
132        help="Explicit /.../p2p/<peer-id> multiaddr to dial.",
133    )
134    parser.add_argument(
135        "--resolve-dns",
136        action=argparse.BooleanOptionalAction,
137        default=True,
138        help="Resolve DNS bootstrap entries for runtime dialing.",
139    )
140    parser.add_argument(
141        "--timeout",
142        type=float,
143        default=10.0,
144        help="Per-peer dial timeout in seconds.",
145    )
146    parser.add_argument(
147        "--json",
148        action="store_true",
149        help="Print deterministic JSON output.",
150    )
151    return parser
152
153
154def main() -> None:
155    parser = build_parser()
156    args = parser.parse_args()
157    try:
158        raise SystemExit(
159            trio.run(
160                run,
161                args.network,
162                args.peer,
163                args.resolve_dns,
164                args.timeout,
165                args.json,
166            )
167        )
168    except KeyboardInterrupt:
169        logger.info("interrupted")
170        raise SystemExit(130)
171
172
173if __name__ == "__main__":
174    main()

Ping + identify a Filecoin peer

$ filecoin-ping-identify-demo --network calibnet --ping-count 3 --json
  1from __future__ import annotations
  2
  3import argparse
  4import json
  5import logging
  6from statistics import mean
  7from typing import Any
  8
  9import multiaddr
 10import trio
 11
 12from libp2p import new_host
 13from libp2p.filecoin import (
 14    FIL_CHAIN_EXCHANGE_PROTOCOL,
 15    FIL_HELLO_PROTOCOL,
 16    get_network_preset,
 17    get_runtime_bootstrap_addresses,
 18)
 19from libp2p.host.ping import PingService
 20from libp2p.identity.identify.identify import (
 21    ID as IDENTIFY_PROTOCOL_ID,
 22    parse_identify_response,
 23)
 24from libp2p.peer.peerinfo import info_from_p2p_addr
 25from libp2p.utils.address_validation import find_free_port
 26from libp2p.utils.varint import read_length_prefixed_protobuf
 27
 28logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
 29logger = logging.getLogger(__name__)
 30
 31
 32def _build_result(
 33    network_alias: str,
 34    network_name: str,
 35    connected: bool,
 36    address: str | None,
 37    peer_id: str | None,
 38    identify: dict[str, Any] | None,
 39    ping: dict[str, Any] | None,
 40    error: str | None,
 41) -> dict[str, Any]:
 42    return {
 43        "network_alias": network_alias,
 44        "network_name": network_name,
 45        "connected": connected,
 46        "address": address,
 47        "peer_id": peer_id,
 48        "identify": identify,
 49        "ping": ping,
 50        "error": error,
 51    }
 52
 53
 54async def _run_identify(host: Any, peer_id: Any) -> dict[str, Any]:
 55    stream = await host.new_stream(peer_id, [IDENTIFY_PROTOCOL_ID])
 56    raw_response = await read_length_prefixed_protobuf(stream, use_varint_format=True)
 57    await stream.close()
 58
 59    identify_msg = parse_identify_response(raw_response)
 60    protocols = list(identify_msg.protocols)
 61
 62    return {
 63        "agent_version": identify_msg.agent_version,
 64        "protocol_version": identify_msg.protocol_version,
 65        "protocol_count": len(protocols),
 66        "supports_filecoin_hello": str(FIL_HELLO_PROTOCOL) in protocols,
 67        "supports_filecoin_chain_exchange": str(FIL_CHAIN_EXCHANGE_PROTOCOL)
 68        in protocols,
 69    }
 70
 71
 72async def _run_ping(host: Any, peer_id: Any, ping_count: int) -> dict[str, Any]:
 73    ping_service = PingService(host)
 74    rtts = await ping_service.ping(peer_id, ping_amt=ping_count)
 75    return {
 76        "count": ping_count,
 77        "rtts_us": rtts,
 78        "avg_rtt_us": mean(rtts) if rtts else None,
 79    }
 80
 81
 82async def run(
 83    network: str,
 84    peer: str | None,
 85    resolve_dns: bool,
 86    timeout: float,
 87    ping_count: int,
 88    as_json: bool,
 89) -> int:
 90    preset = get_network_preset(network)
 91    network_name = preset.genesis_network_name
 92
 93    candidates = (
 94        [peer]
 95        if peer
 96        else get_runtime_bootstrap_addresses(network, resolve_dns=resolve_dns)
 97    )
 98
 99    if not candidates:
100        result = _build_result(
101            network_alias=network,
102            network_name=network_name,
103            connected=False,
104            address=None,
105            peer_id=None,
106            identify=None,
107            ping=None,
108            error="no candidate peer addresses available",
109        )
110        if as_json:
111            print(json.dumps(result, indent=2, sort_keys=True))
112        else:
113            logger.error(result["error"])
114        return 1
115
116    listen_port = find_free_port()
117    listen_addrs = [multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{listen_port}")]
118    host = new_host(listen_addrs=listen_addrs)
119
120    selected_addr: str | None = None
121    selected_info = None
122    last_error: str | None = None
123    identify_payload: dict[str, Any] | None = None
124    ping_payload: dict[str, Any] | None = None
125
126    async with host.run(listen_addrs=listen_addrs):
127        for addr in candidates:
128            try:
129                info = info_from_p2p_addr(multiaddr.Multiaddr(addr))
130                with trio.fail_after(timeout):
131                    await host.connect(info)
132                selected_addr = addr
133                selected_info = info
134                break
135            except Exception as exc:
136                last_error = str(exc)
137
138        if selected_info is not None:
139            try:
140                identify_payload = await _run_identify(host, selected_info.peer_id)
141                ping_payload = await _run_ping(host, selected_info.peer_id, ping_count)
142            except Exception as exc:
143                last_error = str(exc)
144
145    connected = selected_addr is not None
146    result = _build_result(
147        network_alias=network,
148        network_name=network_name,
149        connected=(
150            connected and identify_payload is not None and ping_payload is not None
151        ),
152        address=selected_addr,
153        peer_id=str(selected_info.peer_id) if selected_info else None,
154        identify=identify_payload,
155        ping=ping_payload,
156        error=last_error,
157    )
158
159    if as_json:
160        print(json.dumps(result, indent=2, sort_keys=True))
161    else:
162        logger.info("network alias: %s", result["network_alias"])
163        logger.info("network name: %s", result["network_name"])
164        logger.info("peer: %s", result["peer_id"])
165        logger.info("address: %s", result["address"])
166        if result["identify"] is not None:
167            logger.info("agent version: %s", result["identify"]["agent_version"])
168            logger.info(
169                "supports /fil/hello/1.0.0: %s",
170                result["identify"]["supports_filecoin_hello"],
171            )
172            logger.info(
173                "supports /fil/chain/xchg/0.0.1: %s",
174                result["identify"]["supports_filecoin_chain_exchange"],
175            )
176        if result["ping"] is not None:
177            logger.info("ping avg RTT (us): %s", result["ping"]["avg_rtt_us"])
178        if result["error"] is not None:
179            logger.error("diagnostic error: %s", result["error"])
180
181    return 0 if result["connected"] else 1
182
183
184def build_parser() -> argparse.ArgumentParser:
185    parser = argparse.ArgumentParser(
186        description="Dial a Filecoin peer and run identify + ping diagnostics.",
187    )
188    parser.add_argument(
189        "--network",
190        choices=("mainnet", "calibnet"),
191        default="mainnet",
192        help="Filecoin network alias.",
193    )
194    parser.add_argument(
195        "--peer",
196        type=str,
197        default=None,
198        help="Explicit /.../p2p/<peer-id> multiaddr to dial.",
199    )
200    parser.add_argument(
201        "--resolve-dns",
202        action=argparse.BooleanOptionalAction,
203        default=True,
204        help="Resolve DNS bootstrap entries for runtime dialing.",
205    )
206    parser.add_argument(
207        "--timeout",
208        type=float,
209        default=10.0,
210        help="Per-peer dial timeout in seconds.",
211    )
212    parser.add_argument(
213        "--ping-count",
214        type=int,
215        default=3,
216        help="Number of ping probes to run after dialing.",
217    )
218    parser.add_argument(
219        "--json",
220        action="store_true",
221        help="Print deterministic JSON output.",
222    )
223    return parser
224
225
226def main() -> None:
227    parser = build_parser()
228    args = parser.parse_args()
229    try:
230        raise SystemExit(
231            trio.run(
232                run,
233                args.network,
234                args.peer,
235                args.resolve_dns,
236                args.timeout,
237                args.ping_count,
238                args.json,
239            )
240        )
241    except KeyboardInterrupt:
242        logger.info("interrupted")
243        raise SystemExit(130)
244
245
246if __name__ == "__main__":
247    main()

Read-only pubsub observer

This observer does not publish messages. It subscribes to Filecoin gossip topics and reports inbound metadata.

$ filecoin-pubsub-demo --network mainnet --topic both --seconds 20
$ filecoin-pubsub-demo --network calibnet --topic blocks --max-messages 25 --json
  1from __future__ import annotations
  2
  3import argparse
  4from collections.abc import Sequence
  5from dataclasses import dataclass
  6from datetime import datetime, timezone
  7import json
  8import logging
  9from typing import Any
 10
 11import multiaddr
 12import trio
 13
 14from libp2p import new_host
 15from libp2p.filecoin import (
 16    FIL_CHAIN_EXCHANGE_PROTOCOL,
 17    FIL_HELLO_PROTOCOL,
 18    blocks_topic,
 19    build_filecoin_gossipsub,
 20    build_filecoin_pubsub,
 21    dht_protocol_name,
 22    get_network_preset,
 23    get_runtime_bootstrap_addresses,
 24    messages_topic,
 25)
 26from libp2p.peer.id import ID
 27from libp2p.peer.peerinfo import info_from_p2p_addr
 28from libp2p.tools.anyio_service import background_trio_service
 29from libp2p.utils.address_validation import find_free_port
 30
 31logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
 32logger = logging.getLogger(__name__)
 33
 34
 35@dataclass
 36class ObserverState:
 37    stop_event: trio.Event
 38    message_count: int
 39    max_messages: int | None
 40    lock: trio.Lock
 41
 42    async def on_message(self) -> int:
 43        async with self.lock:
 44            self.message_count += 1
 45            if (
 46                self.max_messages is not None
 47                and self.max_messages > 0
 48                and self.message_count >= self.max_messages
 49            ):
 50                self.stop_event.set()
 51            return self.message_count
 52
 53
 54def _selected_topics(topic_mode: str, network_name: str) -> list[str]:
 55    if topic_mode == "blocks":
 56        return [blocks_topic(network_name)]
 57    if topic_mode == "messages":
 58        return [messages_topic(network_name)]
 59    return [blocks_topic(network_name), messages_topic(network_name)]
 60
 61
 62def _build_snapshot(
 63    network_alias: str,
 64    network_name: str,
 65    bootstrap_addrs: Sequence[str],
 66    listen_port: int,
 67    topics: Sequence[str],
 68    max_messages: int | None,
 69) -> dict[str, Any]:
 70    return {
 71        "network_alias": network_alias,
 72        "network_name": network_name,
 73        "protocols": {
 74            "hello": str(FIL_HELLO_PROTOCOL),
 75            "chain_exchange": str(FIL_CHAIN_EXCHANGE_PROTOCOL),
 76            "dht": str(dht_protocol_name(network_name)),
 77        },
 78        "topics": {
 79            "blocks": blocks_topic(network_name),
 80            "messages": messages_topic(network_name),
 81            "selected": list(topics),
 82        },
 83        "mode": "read_only_observer",
 84        "bootstrap_count": len(bootstrap_addrs),
 85        "bootstrap_addresses": list(bootstrap_addrs),
 86        "listen_addr": f"/ip4/0.0.0.0/tcp/{listen_port}",
 87        "max_messages": max_messages,
 88    }
 89
 90
 91async def _observe_topic(
 92    topic: str,
 93    subscription: Any,
 94    state: ObserverState,
 95) -> None:
 96    while not state.stop_event.is_set():
 97        try:
 98            message = await subscription.get()
 99        except Exception as exc:
100            logger.warning("subscription error on %s: %s", topic, exc)
101            return
102
103        source = "unknown"
104        if message.from_id:
105            source = ID(message.from_id).to_base58()
106
107        ordinal = await state.on_message()
108        payload_size = len(message.data) if message.data is not None else 0
109        observed_at = datetime.now(timezone.utc).isoformat()
110        logger.info(
111            "observed message #%d topic=%s source=%s payload_bytes=%d ts=%s",
112            ordinal,
113            topic,
114            source,
115            payload_size,
116            observed_at,
117        )
118
119
120async def _connect_bootstrap_peers(
121    host: Any,
122    addrs: Sequence[str],
123    timeout: float = 8.0,
124    max_success: int = 3,
125) -> int:
126    connected = 0
127    for addr in addrs:
128        if connected >= max_success:
129            break
130        try:
131            info = info_from_p2p_addr(multiaddr.Multiaddr(addr))
132        except Exception as exc:
133            logger.debug("invalid bootstrap address %s: %s", addr, exc)
134            continue
135
136        try:
137            with trio.move_on_after(timeout) as scope:
138                await host.connect(info)
139            if scope.cancelled_caught:
140                logger.debug("timeout connecting bootstrap peer %s", info.peer_id)
141                continue
142            connected += 1
143            logger.info("connected bootstrap peer: %s", info.peer_id)
144        except Exception as exc:
145            logger.debug("failed bootstrap connect %s: %s", info.peer_id, exc)
146    return connected
147
148
149async def run(
150    network: str,
151    resolve_dns: bool,
152    include_quic: bool,
153    run_seconds: float,
154    max_messages: int | None,
155    topic_mode: str,
156    as_json: bool,
157) -> int:
158    preset = get_network_preset(network)
159    network_name = preset.genesis_network_name
160    topics = _selected_topics(topic_mode, network_name)
161
162    runtime_bootstrap = get_runtime_bootstrap_addresses(
163        network,
164        resolve_dns=resolve_dns,
165        include_quic=include_quic,
166    )
167
168    listen_port = find_free_port()
169    listen_addrs = [multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{listen_port}")]
170    # Delay bootstrap dials until pubsub services are running to avoid races
171    # where inbound streams are handled before Pubsub manager initialization.
172    host = new_host(listen_addrs=listen_addrs)
173
174    gossipsub = build_filecoin_gossipsub(network_name=network_name)
175    pubsub = build_filecoin_pubsub(
176        host=host,
177        gossipsub=gossipsub,
178        network_name=network_name,
179    )
180
181    snapshot = _build_snapshot(
182        network_alias=network,
183        network_name=network_name,
184        bootstrap_addrs=runtime_bootstrap,
185        listen_port=listen_port,
186        topics=topics,
187        max_messages=max_messages,
188    )
189
190    if as_json:
191        print(json.dumps(snapshot, indent=2, sort_keys=True))
192    else:
193        logger.info("network alias: %s", snapshot["network_alias"])
194        logger.info("network name: %s", snapshot["network_name"])
195        logger.info("hello protocol: %s", snapshot["protocols"]["hello"])
196        logger.info(
197            "chain exchange protocol: %s",
198            snapshot["protocols"]["chain_exchange"],
199        )
200        logger.info("dht protocol: %s", snapshot["protocols"]["dht"])
201        logger.info("selected topics: %s", ", ".join(topics))
202        logger.info("runtime bootstrap peers: %d", snapshot["bootstrap_count"])
203        logger.info("observer mode: read-only (publishing disabled)")
204        for addr in runtime_bootstrap[:5]:
205            logger.info("bootstrap: %s", addr)
206
207    state = ObserverState(
208        stop_event=trio.Event(),
209        message_count=0,
210        max_messages=max_messages,
211        lock=trio.Lock(),
212    )
213
214    async with host.run(listen_addrs=listen_addrs):
215        async with background_trio_service(pubsub):
216            async with background_trio_service(gossipsub):
217                await pubsub.wait_until_ready()
218                connected_bootstrap = await _connect_bootstrap_peers(
219                    host,
220                    runtime_bootstrap,
221                    timeout=8.0,
222                    max_success=3,
223                )
224                logger.info("connected bootstrap peers: %d", connected_bootstrap)
225
226                subscriptions = {}
227                for topic in topics:
228                    subscriptions[topic] = await pubsub.subscribe(topic)
229                    logger.info("subscribed to %s", topic)
230
231                async with trio.open_nursery() as nursery:
232                    for topic, subscription in subscriptions.items():
233                        nursery.start_soon(_observe_topic, topic, subscription, state)
234
235                    if run_seconds > 0:
236                        with trio.move_on_after(run_seconds):
237                            await state.stop_event.wait()
238                        state.stop_event.set()
239                    else:
240                        await state.stop_event.wait()
241                    nursery.cancel_scope.cancel()
242
243    return 0
244
245
246def build_parser() -> argparse.ArgumentParser:
247    parser = argparse.ArgumentParser(
248        description="Filecoin pubsub read-only observer using libp2p.filecoin presets.",
249    )
250    parser.add_argument(
251        "--network",
252        choices=("mainnet", "calibnet"),
253        default="mainnet",
254        help="Filecoin network alias.",
255    )
256    parser.add_argument(
257        "--resolve-dns",
258        action=argparse.BooleanOptionalAction,
259        default=True,
260        help="Resolve dns bootstrap entries to /ip4/.../tcp/... addresses.",
261    )
262    parser.add_argument(
263        "--include-quic",
264        action="store_true",
265        help="Include QUIC and WebTransport bootstrap entries.",
266    )
267    parser.add_argument(
268        "--seconds",
269        type=float,
270        default=20.0,
271        help="How long to keep the observer running. Use 0 for indefinite run.",
272    )
273    parser.add_argument(
274        "--max-messages",
275        type=int,
276        default=None,
277        help="Stop after this many observed messages across selected topics.",
278    )
279    parser.add_argument(
280        "--topic",
281        choices=("blocks", "messages", "both"),
282        default="both",
283        help="Which Filecoin gossip topics to observe.",
284    )
285    parser.add_argument(
286        "--json",
287        action="store_true",
288        help="Print configuration snapshot as JSON before starting.",
289    )
290    return parser
291
292
293def main() -> None:
294    parser = build_parser()
295    args = parser.parse_args()
296
297    try:
298        raise SystemExit(
299            trio.run(
300                run,
301                args.network,
302                args.resolve_dns,
303                args.include_quic,
304                args.seconds,
305                args.max_messages,
306                args.topic,
307                args.json,
308            )
309        )
310    except KeyboardInterrupt:
311        logger.info("interrupted")
312        raise SystemExit(130)
313
314
315if __name__ == "__main__":
316    main()

CLI helpers

$ filecoin-dx topics --network mainnet --json
$ filecoin-dx bootstrap --network mainnet --runtime --resolve-dns --json
$ python -m libp2p.filecoin preset --network calibnet --json