Agentic Request/Response Demo

This example demonstrates a Filecoin-aligned, locally simulated agent control workflow built on top of the high-level libp2p request/response helper.

It performs two one-shot exchanges over fresh protocol streams:

  • a capability_query to discover the agent’s supported task semantics

  • a store_intent request that returns a simulated storage result

The example does not call Filecoin services, Synapse SDK, or external providers. It models only the control-plane messages and result semantics.

$ agentic-request-response-demo
Listener ready, listening on:
...

Copy the printed command into another terminal, for example:

$ agentic-request-response-demo -d /ip4/127.0.0.1/tcp/8000/p2p/<PEER_ID> --name hello.txt --size 256
Capability response:
...
Store result summary:
  status: complete
  ...

To demonstrate partial success with an unhealthy secondary provider:

$ agentic-request-response-demo -d /ip4/127.0.0.1/tcp/8000/p2p/<PEER_ID> --name hello.txt --size 256 --copies 3

The full source code for this example is below:

  1from __future__ import annotations
  2
  3import argparse
  4import hashlib
  5import json
  6import logging
  7import random
  8import secrets
  9import time
 10
 11import multiaddr
 12import trio
 13
 14from libp2p import new_host
 15from libp2p.crypto.secp256k1 import create_new_key_pair
 16from libp2p.peer.peerinfo import info_from_p2p_addr
 17from libp2p.request_response import JSONCodec, RequestContext, RequestResponse
 18from libp2p.utils.address_validation import (
 19    find_free_port,
 20    get_available_interfaces,
 21    get_optimal_binding_address,
 22)
 23
 24from .agentic_protocol import (
 25    PROTOCOL_ID,
 26    REQUIRED_SESSION_PERMISSIONS,
 27    build_capability_query,
 28    build_store_intent,
 29    process_request,
 30)
 31
 32logging.basicConfig(level=logging.WARNING)
 33logging.getLogger("multiaddr").setLevel(logging.WARNING)
 34logging.getLogger("libp2p").setLevel(logging.WARNING)
 35
 36
 37def _build_authorization(mode: str, expired_session: bool) -> dict[str, object]:
 38    if mode == "root":
 39        return {"mode": "root"}
 40
 41    expires_at = int(time.time()) - 60 if expired_session else int(time.time()) + 3600
 42    return {
 43        "mode": "session_key",
 44        "expires_at": expires_at,
 45        "permissions": list(REQUIRED_SESSION_PERMISSIONS),
 46    }
 47
 48
 49async def _handler(
 50    request: dict[str, object], context: RequestContext
 51) -> dict[str, object]:
 52    del context
 53    return process_request(request)
 54
 55
 56def _make_task_id(name: str, size: int, copies: int, auth_mode: str) -> str:
 57    digest = hashlib.sha256(f"{name}:{size}:{copies}:{auth_mode}".encode()).hexdigest()
 58    return f"task-{digest[:8]}"
 59
 60
 61def _print_capabilities(response: dict[str, object]) -> None:
 62    print("Capability response:")
 63    print(json.dumps(response, indent=2, sort_keys=True))
 64
 65
 66def _print_store_result(response: dict[str, object]) -> None:
 67    print("\nStore result summary:")
 68    print(f"  status: {response.get('status')}")
 69    print(f"  complete: {response.get('complete')}")
 70    print(f"  task_id: {response.get('task_id')}")
 71    print(f"  piece_cid: {response.get('piece_cid')}")
 72
 73    copies = response.get("copies", [])
 74    if isinstance(copies, list) and copies:
 75        print("  successful copies:")
 76        for copy in copies:
 77            if not isinstance(copy, dict):
 78                continue
 79            print(
 80                "    "
 81                f"provider={copy.get('provider_id')} "
 82                f"role={copy.get('role')} "
 83                f"data_set_id={copy.get('data_set_id')} "
 84                f"piece_id={copy.get('piece_id')}"
 85            )
 86
 87    failed_attempts = response.get("failed_attempts", [])
 88    if isinstance(failed_attempts, list) and failed_attempts:
 89        print("  failed attempts:")
 90        for attempt in failed_attempts:
 91            if not isinstance(attempt, dict):
 92                continue
 93            print(
 94                "    "
 95                f"provider={attempt.get('provider_id')} "
 96                f"role={attempt.get('role')} "
 97                f"error={attempt.get('error')}"
 98            )
 99
100    errors = response.get("errors", [])
101    if isinstance(errors, list) and errors:
102        print("  errors:")
103        for error in errors:
104            print(f"    {error}")
105
106    notes = response.get("notes", [])
107    if isinstance(notes, list) and notes:
108        print("  notes:")
109        for note in notes:
110            print(f"    {note}")
111
112
113async def run(
114    port: int,
115    destination: str | None,
116    name: str,
117    size: int,
118    copies: int,
119    with_cdn: bool,
120    auth_mode: str,
121    expired_session: bool,
122    seed: int | None = None,
123) -> None:
124    if port <= 0:
125        port = find_free_port()
126    listen_addr = get_available_interfaces(port)
127
128    if seed is not None:
129        random.seed(seed)
130        secret_number = random.getrandbits(32 * 8)
131        secret = secret_number.to_bytes(length=32, byteorder="big")
132    else:
133        secret = secrets.token_bytes(32)
134
135    host = new_host(key_pair=create_new_key_pair(secret))
136    rr = RequestResponse(host)
137    codec = JSONCodec()
138
139    async with host.run(listen_addrs=listen_addr), trio.open_nursery() as nursery:
140        nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
141        print(f"I am {host.get_id().to_string()}")
142
143        if not destination:
144            rr.set_handler(PROTOCOL_ID, handler=_handler, codec=codec)
145            peer_id = host.get_id().to_string()
146            print("Listener ready, listening on:\n")
147            for addr in listen_addr:
148                print(f"{addr}/p2p/{peer_id}")
149
150            optimal_addr = get_optimal_binding_address(port)
151            optimal_addr_with_peer = f"{optimal_addr}/p2p/{peer_id}"
152            print(
153                "\nRun this from the same folder in another console:\n\n"
154                "agentic-request-response-demo "
155                f"-d {optimal_addr_with_peer} --name hello.txt --size 256\n\n"
156                "For a partial-success run that requests too many healthy copies:\n\n"
157                "agentic-request-response-demo "
158                f"-d {optimal_addr_with_peer} --name hello.txt --size 256 --copies 3\n"
159            )
160            print("Waiting for incoming requests...")
161            await trio.sleep_forever()
162
163        destination_str = destination
164        if destination_str is None:
165            raise ValueError("destination is required in dialer mode")
166
167        rr.set_handler(PROTOCOL_ID, handler=_handler, codec=codec)
168        maddr = multiaddr.Multiaddr(destination_str)
169        info = info_from_p2p_addr(maddr)
170        await host.connect(info)
171
172        capabilities = await rr.send_request(
173            peer_id=info.peer_id,
174            protocol_ids=[PROTOCOL_ID],
175            request=build_capability_query(),
176            codec=codec,
177        )
178        _print_capabilities(capabilities)
179
180        task_id = _make_task_id(name, size, copies, auth_mode)
181        store_result = await rr.send_request(
182            peer_id=info.peer_id,
183            protocol_ids=[PROTOCOL_ID],
184            request=build_store_intent(
185                task_id=task_id,
186                content_label=name,
187                declared_size_bytes=size,
188                copies=copies,
189                with_cdn=with_cdn,
190                dataset_metadata={"source": "agent-demo"},
191                piece_metadata={"filename": name},
192                authorization=_build_authorization(auth_mode, expired_session),
193            ),
194            codec=codec,
195        )
196        _print_store_result(store_result)
197        nursery.cancel_scope.cancel()
198
199
200def main() -> None:
201    description = """
202    Demonstrates a Filecoin-aligned, locally simulated agent workflow on top of
203    the high-level libp2p request/response helper. Run once without -d to start
204    a listener, then run again with -d to perform capability discovery and
205    submit a storage-style task request.
206    """
207    example_maddr = (
208        "/ip4/[HOST_IP]/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
209    )
210    parser = argparse.ArgumentParser(description=description)
211    parser.add_argument("-p", "--port", default=0, type=int, help="source port")
212    parser.add_argument(
213        "-d",
214        "--destination",
215        type=str,
216        help=f"destination multiaddr string, e.g. {example_maddr}",
217    )
218    parser.add_argument(
219        "--name",
220        type=str,
221        default="hello.txt",
222        help="content label to include in the simulated storage request",
223    )
224    parser.add_argument(
225        "--size",
226        type=int,
227        default=256,
228        help="declared size in bytes for the simulated storage request",
229    )
230    parser.add_argument(
231        "--copies",
232        type=int,
233        default=2,
234        help="requested number of simulated storage copies",
235    )
236    parser.add_argument(
237        "--with-cdn",
238        action="store_true",
239        help="set the simulated CDN flag on the storage request",
240    )
241    parser.add_argument(
242        "--auth-mode",
243        choices=("root", "session"),
244        default="root",
245        help="authorization mode to send with the simulated task",
246    )
247    parser.add_argument(
248        "--expired-session",
249        action="store_true",
250        help="send an expired simulated session-key authorization",
251    )
252    parser.add_argument(
253        "-s",
254        "--seed",
255        type=int,
256        help="seed the RNG to make peer IDs reproducible",
257    )
258    args = parser.parse_args()
259
260    auth_mode = "session_key" if args.auth_mode == "session" else "root"
261    try:
262        trio.run(
263            run,
264            args.port,
265            args.destination,
266            args.name,
267            args.size,
268            args.copies,
269            args.with_cdn,
270            auth_mode,
271            args.expired_session,
272            args.seed,
273        )
274    except KeyboardInterrupt:
275        pass
276
277
278if __name__ == "__main__":
279    main()