Agentic Request/Response Demo
This example demonstrates a Filecoin-aligned, locally simulated agent control workflow built on top of the high-level libp2p request/response helper.
It performs two one-shot exchanges over fresh protocol streams:
a
capability_queryto discover the agent’s supported task semanticsa
store_intentrequest that returns a simulated storage result
The example does not call Filecoin services, Synapse SDK, or external providers. It models only the control-plane messages and result semantics.
$ agentic-request-response-demo
Listener ready, listening on:
...
Copy the printed command into another terminal, for example:
$ agentic-request-response-demo -d /ip4/127.0.0.1/tcp/8000/p2p/<PEER_ID> --name hello.txt --size 256
Capability response:
...
Store result summary:
status: complete
...
To demonstrate partial success with an unhealthy secondary provider:
$ agentic-request-response-demo -d /ip4/127.0.0.1/tcp/8000/p2p/<PEER_ID> --name hello.txt --size 256 --copies 3
The full source code for this example is below:
1from __future__ import annotations
2
3import argparse
4import hashlib
5import json
6import logging
7import random
8import secrets
9import time
10
11import multiaddr
12import trio
13
14from libp2p import new_host
15from libp2p.crypto.secp256k1 import create_new_key_pair
16from libp2p.peer.peerinfo import info_from_p2p_addr
17from libp2p.request_response import JSONCodec, RequestContext, RequestResponse
18from libp2p.utils.address_validation import (
19 find_free_port,
20 get_available_interfaces,
21 get_optimal_binding_address,
22)
23
24from .agentic_protocol import (
25 PROTOCOL_ID,
26 REQUIRED_SESSION_PERMISSIONS,
27 build_capability_query,
28 build_store_intent,
29 process_request,
30)
31
32logging.basicConfig(level=logging.WARNING)
33logging.getLogger("multiaddr").setLevel(logging.WARNING)
34logging.getLogger("libp2p").setLevel(logging.WARNING)
35
36
37def _build_authorization(mode: str, expired_session: bool) -> dict[str, object]:
38 if mode == "root":
39 return {"mode": "root"}
40
41 expires_at = int(time.time()) - 60 if expired_session else int(time.time()) + 3600
42 return {
43 "mode": "session_key",
44 "expires_at": expires_at,
45 "permissions": list(REQUIRED_SESSION_PERMISSIONS),
46 }
47
48
49async def _handler(
50 request: dict[str, object], context: RequestContext
51) -> dict[str, object]:
52 del context
53 return process_request(request)
54
55
56def _make_task_id(name: str, size: int, copies: int, auth_mode: str) -> str:
57 digest = hashlib.sha256(f"{name}:{size}:{copies}:{auth_mode}".encode()).hexdigest()
58 return f"task-{digest[:8]}"
59
60
61def _print_capabilities(response: dict[str, object]) -> None:
62 print("Capability response:")
63 print(json.dumps(response, indent=2, sort_keys=True))
64
65
66def _print_store_result(response: dict[str, object]) -> None:
67 print("\nStore result summary:")
68 print(f" status: {response.get('status')}")
69 print(f" complete: {response.get('complete')}")
70 print(f" task_id: {response.get('task_id')}")
71 print(f" piece_cid: {response.get('piece_cid')}")
72
73 copies = response.get("copies", [])
74 if isinstance(copies, list) and copies:
75 print(" successful copies:")
76 for copy in copies:
77 if not isinstance(copy, dict):
78 continue
79 print(
80 " "
81 f"provider={copy.get('provider_id')} "
82 f"role={copy.get('role')} "
83 f"data_set_id={copy.get('data_set_id')} "
84 f"piece_id={copy.get('piece_id')}"
85 )
86
87 failed_attempts = response.get("failed_attempts", [])
88 if isinstance(failed_attempts, list) and failed_attempts:
89 print(" failed attempts:")
90 for attempt in failed_attempts:
91 if not isinstance(attempt, dict):
92 continue
93 print(
94 " "
95 f"provider={attempt.get('provider_id')} "
96 f"role={attempt.get('role')} "
97 f"error={attempt.get('error')}"
98 )
99
100 errors = response.get("errors", [])
101 if isinstance(errors, list) and errors:
102 print(" errors:")
103 for error in errors:
104 print(f" {error}")
105
106 notes = response.get("notes", [])
107 if isinstance(notes, list) and notes:
108 print(" notes:")
109 for note in notes:
110 print(f" {note}")
111
112
113async def run(
114 port: int,
115 destination: str | None,
116 name: str,
117 size: int,
118 copies: int,
119 with_cdn: bool,
120 auth_mode: str,
121 expired_session: bool,
122 seed: int | None = None,
123) -> None:
124 if port <= 0:
125 port = find_free_port()
126 listen_addr = get_available_interfaces(port)
127
128 if seed is not None:
129 random.seed(seed)
130 secret_number = random.getrandbits(32 * 8)
131 secret = secret_number.to_bytes(length=32, byteorder="big")
132 else:
133 secret = secrets.token_bytes(32)
134
135 host = new_host(key_pair=create_new_key_pair(secret))
136 rr = RequestResponse(host)
137 codec = JSONCodec()
138
139 async with host.run(listen_addrs=listen_addr), trio.open_nursery() as nursery:
140 nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
141 print(f"I am {host.get_id().to_string()}")
142
143 if not destination:
144 rr.set_handler(PROTOCOL_ID, handler=_handler, codec=codec)
145 peer_id = host.get_id().to_string()
146 print("Listener ready, listening on:\n")
147 for addr in listen_addr:
148 print(f"{addr}/p2p/{peer_id}")
149
150 optimal_addr = get_optimal_binding_address(port)
151 optimal_addr_with_peer = f"{optimal_addr}/p2p/{peer_id}"
152 print(
153 "\nRun this from the same folder in another console:\n\n"
154 "agentic-request-response-demo "
155 f"-d {optimal_addr_with_peer} --name hello.txt --size 256\n\n"
156 "For a partial-success run that requests too many healthy copies:\n\n"
157 "agentic-request-response-demo "
158 f"-d {optimal_addr_with_peer} --name hello.txt --size 256 --copies 3\n"
159 )
160 print("Waiting for incoming requests...")
161 await trio.sleep_forever()
162
163 destination_str = destination
164 if destination_str is None:
165 raise ValueError("destination is required in dialer mode")
166
167 rr.set_handler(PROTOCOL_ID, handler=_handler, codec=codec)
168 maddr = multiaddr.Multiaddr(destination_str)
169 info = info_from_p2p_addr(maddr)
170 await host.connect(info)
171
172 capabilities = await rr.send_request(
173 peer_id=info.peer_id,
174 protocol_ids=[PROTOCOL_ID],
175 request=build_capability_query(),
176 codec=codec,
177 )
178 _print_capabilities(capabilities)
179
180 task_id = _make_task_id(name, size, copies, auth_mode)
181 store_result = await rr.send_request(
182 peer_id=info.peer_id,
183 protocol_ids=[PROTOCOL_ID],
184 request=build_store_intent(
185 task_id=task_id,
186 content_label=name,
187 declared_size_bytes=size,
188 copies=copies,
189 with_cdn=with_cdn,
190 dataset_metadata={"source": "agent-demo"},
191 piece_metadata={"filename": name},
192 authorization=_build_authorization(auth_mode, expired_session),
193 ),
194 codec=codec,
195 )
196 _print_store_result(store_result)
197 nursery.cancel_scope.cancel()
198
199
200def main() -> None:
201 description = """
202 Demonstrates a Filecoin-aligned, locally simulated agent workflow on top of
203 the high-level libp2p request/response helper. Run once without -d to start
204 a listener, then run again with -d to perform capability discovery and
205 submit a storage-style task request.
206 """
207 example_maddr = (
208 "/ip4/[HOST_IP]/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
209 )
210 parser = argparse.ArgumentParser(description=description)
211 parser.add_argument("-p", "--port", default=0, type=int, help="source port")
212 parser.add_argument(
213 "-d",
214 "--destination",
215 type=str,
216 help=f"destination multiaddr string, e.g. {example_maddr}",
217 )
218 parser.add_argument(
219 "--name",
220 type=str,
221 default="hello.txt",
222 help="content label to include in the simulated storage request",
223 )
224 parser.add_argument(
225 "--size",
226 type=int,
227 default=256,
228 help="declared size in bytes for the simulated storage request",
229 )
230 parser.add_argument(
231 "--copies",
232 type=int,
233 default=2,
234 help="requested number of simulated storage copies",
235 )
236 parser.add_argument(
237 "--with-cdn",
238 action="store_true",
239 help="set the simulated CDN flag on the storage request",
240 )
241 parser.add_argument(
242 "--auth-mode",
243 choices=("root", "session"),
244 default="root",
245 help="authorization mode to send with the simulated task",
246 )
247 parser.add_argument(
248 "--expired-session",
249 action="store_true",
250 help="send an expired simulated session-key authorization",
251 )
252 parser.add_argument(
253 "-s",
254 "--seed",
255 type=int,
256 help="seed the RNG to make peer IDs reproducible",
257 )
258 args = parser.parse_args()
259
260 auth_mode = "session_key" if args.auth_mode == "session" else "root"
261 try:
262 trio.run(
263 run,
264 args.port,
265 args.destination,
266 args.name,
267 args.size,
268 args.copies,
269 args.with_cdn,
270 auth_mode,
271 args.expired_session,
272 args.seed,
273 )
274 except KeyboardInterrupt:
275 pass
276
277
278if __name__ == "__main__":
279 main()