A2A Payment Demo
This example demonstrates an A2A-shaped Filecoin payment and storage workflow built on top of the high-level libp2p request/response helper.
It keeps the transport fully inside py-libp2p but models a custom binding that resembles the A2A JSON-RPC task flow:
fetch an Agent Card-like capability document
send a
SendMessagerequest that creates a taskreceive
TASK_STATE_AUTH_REQUIREDwith a Filecoin Pay-style quotesend a follow-up authorization message for the same task
fetch the completed task with payment and storage artifacts
By default this transport-focused example remains local and uses the simulated execution backend. For the official A2A HTTP/JSON-RPC + SSE facade and the optional Synapse-backed execution path, see A2A HTTP Payment Demo.
$ a2a-payment-demo
Listener ready, listening on:
...
Copy the printed command into another terminal, for example:
$ a2a-payment-demo -d /ip4/127.0.0.1/tcp/8000/p2p/<PEER_ID> --name hello.txt --size 256
Agent Card:
...
Task after initial SendMessage:
...
Task after payment authorization:
...
To demonstrate partial provider fulfilment while still completing the task:
$ a2a-payment-demo -d /ip4/127.0.0.1/tcp/8000/p2p/<PEER_ID> --name hello.txt --size 256 --copies 3
The full source code for this example is below:
1from __future__ import annotations
2
3import argparse
4import json
5import logging
6import random
7import secrets
8
9import multiaddr
10import trio
11
12from libp2p import new_host
13from libp2p.crypto.secp256k1 import create_new_key_pair
14from libp2p.filecoin.address import DEMO_F410_PAYER
15from libp2p.peer.peerinfo import info_from_p2p_addr
16from libp2p.request_response import JSONCodec, RequestContext, RequestResponse
17from libp2p.utils.address_validation import (
18 find_free_port,
19 get_available_interfaces,
20 get_optimal_binding_address,
21)
22
23from .a2a_payment_protocol import (
24 DEFAULT_COPIES,
25 DEFAULT_LOCKUP_EPOCHS,
26 DEFAULT_PAYMENT_RATE_USDFC_PER_EPOCH,
27 PROTOCOL_ID,
28 A2APaymentDemoServer,
29 build_authorization_followup_request,
30 build_get_agent_card_request,
31 build_get_task_request,
32 build_send_message_request,
33)
34
35logging.basicConfig(level=logging.WARNING)
36logging.getLogger("multiaddr").setLevel(logging.WARNING)
37logging.getLogger("libp2p").setLevel(logging.WARNING)
38
39
40def _print_agent_card(card: dict[str, object]) -> None:
41 print("Agent Card:")
42 print(json.dumps(card, indent=2, sort_keys=True))
43
44
45def _extract_task(response: dict[str, object]) -> dict[str, object]:
46 result = response.get("result")
47 if not isinstance(result, dict):
48 raise ValueError("JSON-RPC response is missing result")
49 task = result.get("task")
50 if not isinstance(task, dict):
51 raise ValueError("JSON-RPC result is missing task")
52 return task
53
54
55def _print_task(task: dict[str, object], *, title: str) -> None:
56 print(f"\n{title}:")
57 print(json.dumps(task, indent=2, sort_keys=True))
58
59
60def _extract_quote(task: dict[str, object]) -> dict[str, object]:
61 metadata = task.get("metadata")
62 if not isinstance(metadata, dict):
63 raise ValueError("task is missing metadata")
64 quote = metadata.get("quote")
65 if not isinstance(quote, dict):
66 raise ValueError("task metadata is missing quote")
67 return quote
68
69
70def _extract_context_id(task: dict[str, object]) -> str:
71 context_id = task.get("contextId")
72 if not isinstance(context_id, str):
73 raise ValueError("task is missing contextId")
74 return context_id
75
76
77def _extract_task_id(task: dict[str, object]) -> str:
78 task_id = task.get("id")
79 if not isinstance(task_id, str):
80 raise ValueError("task is missing id")
81 return task_id
82
83
84def _extract_state(task: dict[str, object]) -> str:
85 status = task.get("status")
86 if not isinstance(status, dict):
87 raise ValueError("task is missing status")
88 state = status.get("state")
89 if not isinstance(state, str):
90 raise ValueError("task status is missing state")
91 return state
92
93
94async def _handler(
95 server: A2APaymentDemoServer,
96 request: dict[str, object],
97 context: RequestContext,
98) -> dict[str, object]:
99 del context
100 return server.process_request(request)
101
102
103async def run(
104 port: int,
105 destination: str | None,
106 name: str,
107 size: int,
108 copies: int,
109 with_cdn: bool,
110 payer: str,
111 max_lockup_usdfc: int | None,
112 payment_rate_usdfc_per_epoch: int,
113 lockup_epochs: int,
114 seed: int | None,
115) -> None:
116 if port <= 0:
117 port = find_free_port()
118 listen_addr = get_available_interfaces(port)
119
120 if seed is not None:
121 random.seed(seed)
122 secret_number = random.getrandbits(32 * 8)
123 secret = secret_number.to_bytes(length=32, byteorder="big")
124 else:
125 secret = secrets.token_bytes(32)
126
127 host = new_host(key_pair=create_new_key_pair(secret))
128 rr = RequestResponse(host)
129 codec = JSONCodec()
130 server = A2APaymentDemoServer()
131
132 async with host.run(listen_addrs=listen_addr), trio.open_nursery() as nursery:
133 nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
134 rr.set_handler(
135 PROTOCOL_ID,
136 handler=lambda request, context: _handler(server, request, context),
137 codec=codec,
138 )
139 print(f"I am {host.get_id().to_string()}")
140
141 if not destination:
142 peer_id = host.get_id().to_string()
143 print("Listener ready, listening on:\n")
144 for addr in listen_addr:
145 print(f"{addr}/p2p/{peer_id}")
146
147 optimal_addr = get_optimal_binding_address(port)
148 optimal_addr_with_peer = f"{optimal_addr}/p2p/{peer_id}"
149 print(
150 "\nRun this from the same folder in another console:\n\n"
151 "a2a-payment-demo "
152 f"-d {optimal_addr_with_peer} --name hello.txt --size 256\n\n"
153 "To simulate partial provider fulfilment with three "
154 "requested copies:\n\n"
155 "a2a-payment-demo "
156 f"-d {optimal_addr_with_peer} --name hello.txt --size 256 --copies 3\n"
157 )
158 print("Waiting for incoming requests...")
159 await trio.sleep_forever()
160
161 destination_str = destination
162 if destination_str is None:
163 raise ValueError("destination is required in dialer mode")
164
165 maddr = multiaddr.Multiaddr(destination_str)
166 info = info_from_p2p_addr(maddr)
167 await host.connect(info)
168
169 card_response = await rr.send_request(
170 peer_id=info.peer_id,
171 protocol_ids=[PROTOCOL_ID],
172 request=build_get_agent_card_request(),
173 codec=codec,
174 )
175 card_result = card_response.get("result")
176 if not isinstance(card_result, dict):
177 raise ValueError("agent card response is missing result")
178 _print_agent_card(card_result)
179
180 initial_task_response = await rr.send_request(
181 peer_id=info.peer_id,
182 protocol_ids=[PROTOCOL_ID],
183 request=build_send_message_request(
184 request_id="send-message",
185 message_id="msg-store-request",
186 content_label=name,
187 declared_size_bytes=size,
188 copies=copies,
189 with_cdn=with_cdn,
190 payment_rate_usdfc_per_epoch=payment_rate_usdfc_per_epoch,
191 lockup_epochs=lockup_epochs,
192 ),
193 codec=codec,
194 )
195 initial_task = _extract_task(initial_task_response)
196 _print_task(initial_task, title="Task after initial SendMessage")
197
198 quote = _extract_quote(initial_task)
199 task_id = _extract_task_id(initial_task)
200 context_id = _extract_context_id(initial_task)
201 quoted_lockup = quote.get("depositNeededUsdfc")
202 if not isinstance(quoted_lockup, int):
203 raise ValueError("quote is missing depositNeededUsdfc")
204 approved_lockup = (
205 max_lockup_usdfc if max_lockup_usdfc is not None else quoted_lockup
206 )
207
208 followup_response = await rr.send_request(
209 peer_id=info.peer_id,
210 protocol_ids=[PROTOCOL_ID],
211 request=build_authorization_followup_request(
212 request_id="authorize-payment",
213 message_id="msg-authorize-payment",
214 task_id=task_id,
215 context_id=context_id,
216 max_lockup_usdfc=approved_lockup,
217 payer=payer,
218 ),
219 codec=codec,
220 )
221 auth_task = _extract_task(followup_response)
222 _print_task(auth_task, title="Task after payment authorization")
223
224 task_state = _extract_state(auth_task)
225 print(f"\nWatching task progress ({task_state}):")
226 _STREAMING_INTERVAL = 0.5
227 _MAX_POLLS = 10
228
229 for poll_index in range(1, _MAX_POLLS + 1):
230 if task_state in (
231 "TASK_STATE_COMPLETED",
232 "TASK_STATE_FAILED",
233 "TASK_STATE_CANCELED",
234 ):
235 print(" (Task reached terminal state)")
236 break
237
238 await trio.sleep(_STREAMING_INTERVAL)
239 polled_response = await rr.send_request(
240 peer_id=info.peer_id,
241 protocol_ids=[PROTOCOL_ID],
242 request=build_get_task_request(
243 request_id=f"poll-{poll_index}", task_id=task_id
244 ),
245 codec=codec,
246 )
247 polled_task = _extract_task(polled_response)
248 new_state = _extract_state(polled_task)
249 if new_state != task_state:
250 print(f" State changed: {task_state} -> {new_state}")
251 task_state = new_state
252
253 if task_state != _extract_state(auth_task):
254 fetched_task_response = await rr.send_request(
255 peer_id=info.peer_id,
256 protocol_ids=[PROTOCOL_ID],
257 request=build_get_task_request(request_id="get-final", task_id=task_id),
258 codec=codec,
259 )
260 fetched_task = _extract_task(fetched_task_response)
261 _print_task(fetched_task, title="Final task state (streaming complete)")
262 nursery.cancel_scope.cancel()
263
264
265def main() -> None:
266 description = """
267 Demonstrates an A2A-shaped Filecoin payment and storage workflow built on top
268 of the high-level libp2p request/response helper. Run once without -d to
269 start a listener, then run again with -d to perform Agent Card discovery,
270 task creation, payment authorization, and final task retrieval.
271 """
272 example_maddr = (
273 "/ip4/[HOST_IP]/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
274 )
275 parser = argparse.ArgumentParser(description=description)
276 parser.add_argument("-p", "--port", default=0, type=int, help="source port")
277 parser.add_argument(
278 "-d",
279 "--destination",
280 type=str,
281 help=f"destination multiaddr string, e.g. {example_maddr}",
282 )
283 parser.add_argument(
284 "--name",
285 type=str,
286 default="hello.txt",
287 help="content label to include in the storage request",
288 )
289 parser.add_argument(
290 "--size",
291 type=int,
292 default=256,
293 help="declared size in bytes for the storage request",
294 )
295 parser.add_argument(
296 "--copies",
297 type=int,
298 default=DEFAULT_COPIES,
299 help="requested number of storage copies",
300 )
301 parser.add_argument(
302 "--with-cdn",
303 action="store_true",
304 help="set the simulated CDN flag on the storage request",
305 )
306 parser.add_argument(
307 "--payer",
308 type=str,
309 default=DEMO_F410_PAYER,
310 help="payer identifier as a Filecoin address (f410 for delegated/EAM, "
311 "f0 for ID, f1 for SECP256K1)",
312 )
313 parser.add_argument(
314 "--max-lockup-usdfc",
315 type=int,
316 help="override the authorized lockup amount; defaults to the quoted deposit",
317 )
318 parser.add_argument(
319 "--payment-rate-usdfc-per-epoch",
320 type=int,
321 default=DEFAULT_PAYMENT_RATE_USDFC_PER_EPOCH,
322 help="simulated Filecoin Pay rate per epoch",
323 )
324 parser.add_argument(
325 "--lockup-epochs",
326 type=int,
327 default=DEFAULT_LOCKUP_EPOCHS,
328 help="simulated Filecoin Pay lockup period in epochs",
329 )
330 parser.add_argument(
331 "-s",
332 "--seed",
333 type=int,
334 help="seed the RNG to make peer IDs reproducible",
335 )
336 args = parser.parse_args()
337
338 try:
339 trio.run(
340 run,
341 args.port,
342 args.destination,
343 args.name,
344 args.size,
345 args.copies,
346 args.with_cdn,
347 args.payer,
348 args.max_lockup_usdfc,
349 args.payment_rate_usdfc_per_epoch,
350 args.lockup_epochs,
351 args.seed,
352 )
353 except BaseException as exc:
354 if _is_keyboard_interrupt_exit(exc):
355 return
356 raise
357
358
359def _is_keyboard_interrupt_exit(exc: BaseException) -> bool:
360 if isinstance(exc, KeyboardInterrupt):
361 return True
362
363 nested = getattr(exc, "exceptions", None)
364 if not isinstance(nested, tuple) or not nested:
365 return False
366
367 return all(_is_keyboard_interrupt_exit(child) for child in nested)
368
369
370if __name__ == "__main__":
371 main()