A2A Payment Demo

This example demonstrates an A2A-shaped Filecoin payment and storage workflow built on top of the high-level libp2p request/response helper.

It keeps the transport fully inside py-libp2p but models a custom binding that resembles the A2A JSON-RPC task flow:

  • fetch an Agent Card-like capability document

  • send a SendMessage request that creates a task

  • receive TASK_STATE_AUTH_REQUIRED with a Filecoin Pay-style quote

  • send a follow-up authorization message for the same task

  • fetch the completed task with payment and storage artifacts

By default this transport-focused example remains local and uses the simulated execution backend. For the official A2A HTTP/JSON-RPC + SSE facade and the optional Synapse-backed execution path, see A2A HTTP Payment Demo.

$ a2a-payment-demo
Listener ready, listening on:
...

Copy the printed command into another terminal, for example:

$ a2a-payment-demo -d /ip4/127.0.0.1/tcp/8000/p2p/<PEER_ID> --name hello.txt --size 256
Agent Card:
...
Task after initial SendMessage:
...
Task after payment authorization:
...

To demonstrate partial provider fulfilment while still completing the task:

$ a2a-payment-demo -d /ip4/127.0.0.1/tcp/8000/p2p/<PEER_ID> --name hello.txt --size 256 --copies 3

The full source code for this example is below:

  1from __future__ import annotations
  2
  3import argparse
  4import json
  5import logging
  6import random
  7import secrets
  8
  9import multiaddr
 10import trio
 11
 12from libp2p import new_host
 13from libp2p.crypto.secp256k1 import create_new_key_pair
 14from libp2p.filecoin.address import DEMO_F410_PAYER
 15from libp2p.peer.peerinfo import info_from_p2p_addr
 16from libp2p.request_response import JSONCodec, RequestContext, RequestResponse
 17from libp2p.utils.address_validation import (
 18    find_free_port,
 19    get_available_interfaces,
 20    get_optimal_binding_address,
 21)
 22
 23from .a2a_payment_protocol import (
 24    DEFAULT_COPIES,
 25    DEFAULT_LOCKUP_EPOCHS,
 26    DEFAULT_PAYMENT_RATE_USDFC_PER_EPOCH,
 27    PROTOCOL_ID,
 28    A2APaymentDemoServer,
 29    build_authorization_followup_request,
 30    build_get_agent_card_request,
 31    build_get_task_request,
 32    build_send_message_request,
 33)
 34
 35logging.basicConfig(level=logging.WARNING)
 36logging.getLogger("multiaddr").setLevel(logging.WARNING)
 37logging.getLogger("libp2p").setLevel(logging.WARNING)
 38
 39
 40def _print_agent_card(card: dict[str, object]) -> None:
 41    print("Agent Card:")
 42    print(json.dumps(card, indent=2, sort_keys=True))
 43
 44
 45def _extract_task(response: dict[str, object]) -> dict[str, object]:
 46    result = response.get("result")
 47    if not isinstance(result, dict):
 48        raise ValueError("JSON-RPC response is missing result")
 49    task = result.get("task")
 50    if not isinstance(task, dict):
 51        raise ValueError("JSON-RPC result is missing task")
 52    return task
 53
 54
 55def _print_task(task: dict[str, object], *, title: str) -> None:
 56    print(f"\n{title}:")
 57    print(json.dumps(task, indent=2, sort_keys=True))
 58
 59
 60def _extract_quote(task: dict[str, object]) -> dict[str, object]:
 61    metadata = task.get("metadata")
 62    if not isinstance(metadata, dict):
 63        raise ValueError("task is missing metadata")
 64    quote = metadata.get("quote")
 65    if not isinstance(quote, dict):
 66        raise ValueError("task metadata is missing quote")
 67    return quote
 68
 69
 70def _extract_context_id(task: dict[str, object]) -> str:
 71    context_id = task.get("contextId")
 72    if not isinstance(context_id, str):
 73        raise ValueError("task is missing contextId")
 74    return context_id
 75
 76
 77def _extract_task_id(task: dict[str, object]) -> str:
 78    task_id = task.get("id")
 79    if not isinstance(task_id, str):
 80        raise ValueError("task is missing id")
 81    return task_id
 82
 83
 84def _extract_state(task: dict[str, object]) -> str:
 85    status = task.get("status")
 86    if not isinstance(status, dict):
 87        raise ValueError("task is missing status")
 88    state = status.get("state")
 89    if not isinstance(state, str):
 90        raise ValueError("task status is missing state")
 91    return state
 92
 93
 94async def _handler(
 95    server: A2APaymentDemoServer,
 96    request: dict[str, object],
 97    context: RequestContext,
 98) -> dict[str, object]:
 99    del context
100    return server.process_request(request)
101
102
103async def run(
104    port: int,
105    destination: str | None,
106    name: str,
107    size: int,
108    copies: int,
109    with_cdn: bool,
110    payer: str,
111    max_lockup_usdfc: int | None,
112    payment_rate_usdfc_per_epoch: int,
113    lockup_epochs: int,
114    seed: int | None,
115) -> None:
116    if port <= 0:
117        port = find_free_port()
118    listen_addr = get_available_interfaces(port)
119
120    if seed is not None:
121        random.seed(seed)
122        secret_number = random.getrandbits(32 * 8)
123        secret = secret_number.to_bytes(length=32, byteorder="big")
124    else:
125        secret = secrets.token_bytes(32)
126
127    host = new_host(key_pair=create_new_key_pair(secret))
128    rr = RequestResponse(host)
129    codec = JSONCodec()
130    server = A2APaymentDemoServer()
131
132    async with host.run(listen_addrs=listen_addr), trio.open_nursery() as nursery:
133        nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
134        rr.set_handler(
135            PROTOCOL_ID,
136            handler=lambda request, context: _handler(server, request, context),
137            codec=codec,
138        )
139        print(f"I am {host.get_id().to_string()}")
140
141        if not destination:
142            peer_id = host.get_id().to_string()
143            print("Listener ready, listening on:\n")
144            for addr in listen_addr:
145                print(f"{addr}/p2p/{peer_id}")
146
147            optimal_addr = get_optimal_binding_address(port)
148            optimal_addr_with_peer = f"{optimal_addr}/p2p/{peer_id}"
149            print(
150                "\nRun this from the same folder in another console:\n\n"
151                "a2a-payment-demo "
152                f"-d {optimal_addr_with_peer} --name hello.txt --size 256\n\n"
153                "To simulate partial provider fulfilment with three "
154                "requested copies:\n\n"
155                "a2a-payment-demo "
156                f"-d {optimal_addr_with_peer} --name hello.txt --size 256 --copies 3\n"
157            )
158            print("Waiting for incoming requests...")
159            await trio.sleep_forever()
160
161        destination_str = destination
162        if destination_str is None:
163            raise ValueError("destination is required in dialer mode")
164
165        maddr = multiaddr.Multiaddr(destination_str)
166        info = info_from_p2p_addr(maddr)
167        await host.connect(info)
168
169        card_response = await rr.send_request(
170            peer_id=info.peer_id,
171            protocol_ids=[PROTOCOL_ID],
172            request=build_get_agent_card_request(),
173            codec=codec,
174        )
175        card_result = card_response.get("result")
176        if not isinstance(card_result, dict):
177            raise ValueError("agent card response is missing result")
178        _print_agent_card(card_result)
179
180        initial_task_response = await rr.send_request(
181            peer_id=info.peer_id,
182            protocol_ids=[PROTOCOL_ID],
183            request=build_send_message_request(
184                request_id="send-message",
185                message_id="msg-store-request",
186                content_label=name,
187                declared_size_bytes=size,
188                copies=copies,
189                with_cdn=with_cdn,
190                payment_rate_usdfc_per_epoch=payment_rate_usdfc_per_epoch,
191                lockup_epochs=lockup_epochs,
192            ),
193            codec=codec,
194        )
195        initial_task = _extract_task(initial_task_response)
196        _print_task(initial_task, title="Task after initial SendMessage")
197
198        quote = _extract_quote(initial_task)
199        task_id = _extract_task_id(initial_task)
200        context_id = _extract_context_id(initial_task)
201        quoted_lockup = quote.get("depositNeededUsdfc")
202        if not isinstance(quoted_lockup, int):
203            raise ValueError("quote is missing depositNeededUsdfc")
204        approved_lockup = (
205            max_lockup_usdfc if max_lockup_usdfc is not None else quoted_lockup
206        )
207
208        followup_response = await rr.send_request(
209            peer_id=info.peer_id,
210            protocol_ids=[PROTOCOL_ID],
211            request=build_authorization_followup_request(
212                request_id="authorize-payment",
213                message_id="msg-authorize-payment",
214                task_id=task_id,
215                context_id=context_id,
216                max_lockup_usdfc=approved_lockup,
217                payer=payer,
218            ),
219            codec=codec,
220        )
221        auth_task = _extract_task(followup_response)
222        _print_task(auth_task, title="Task after payment authorization")
223
224        task_state = _extract_state(auth_task)
225        print(f"\nWatching task progress ({task_state}):")
226        _STREAMING_INTERVAL = 0.5
227        _MAX_POLLS = 10
228
229        for poll_index in range(1, _MAX_POLLS + 1):
230            if task_state in (
231                "TASK_STATE_COMPLETED",
232                "TASK_STATE_FAILED",
233                "TASK_STATE_CANCELED",
234            ):
235                print("  (Task reached terminal state)")
236                break
237
238            await trio.sleep(_STREAMING_INTERVAL)
239            polled_response = await rr.send_request(
240                peer_id=info.peer_id,
241                protocol_ids=[PROTOCOL_ID],
242                request=build_get_task_request(
243                    request_id=f"poll-{poll_index}", task_id=task_id
244                ),
245                codec=codec,
246            )
247            polled_task = _extract_task(polled_response)
248            new_state = _extract_state(polled_task)
249            if new_state != task_state:
250                print(f"  State changed: {task_state} -> {new_state}")
251                task_state = new_state
252
253        if task_state != _extract_state(auth_task):
254            fetched_task_response = await rr.send_request(
255                peer_id=info.peer_id,
256                protocol_ids=[PROTOCOL_ID],
257                request=build_get_task_request(request_id="get-final", task_id=task_id),
258                codec=codec,
259            )
260            fetched_task = _extract_task(fetched_task_response)
261            _print_task(fetched_task, title="Final task state (streaming complete)")
262        nursery.cancel_scope.cancel()
263
264
265def main() -> None:
266    description = """
267    Demonstrates an A2A-shaped Filecoin payment and storage workflow built on top
268    of the high-level libp2p request/response helper. Run once without -d to
269    start a listener, then run again with -d to perform Agent Card discovery,
270    task creation, payment authorization, and final task retrieval.
271    """
272    example_maddr = (
273        "/ip4/[HOST_IP]/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
274    )
275    parser = argparse.ArgumentParser(description=description)
276    parser.add_argument("-p", "--port", default=0, type=int, help="source port")
277    parser.add_argument(
278        "-d",
279        "--destination",
280        type=str,
281        help=f"destination multiaddr string, e.g. {example_maddr}",
282    )
283    parser.add_argument(
284        "--name",
285        type=str,
286        default="hello.txt",
287        help="content label to include in the storage request",
288    )
289    parser.add_argument(
290        "--size",
291        type=int,
292        default=256,
293        help="declared size in bytes for the storage request",
294    )
295    parser.add_argument(
296        "--copies",
297        type=int,
298        default=DEFAULT_COPIES,
299        help="requested number of storage copies",
300    )
301    parser.add_argument(
302        "--with-cdn",
303        action="store_true",
304        help="set the simulated CDN flag on the storage request",
305    )
306    parser.add_argument(
307        "--payer",
308        type=str,
309        default=DEMO_F410_PAYER,
310        help="payer identifier as a Filecoin address (f410 for delegated/EAM, "
311        "f0 for ID, f1 for SECP256K1)",
312    )
313    parser.add_argument(
314        "--max-lockup-usdfc",
315        type=int,
316        help="override the authorized lockup amount; defaults to the quoted deposit",
317    )
318    parser.add_argument(
319        "--payment-rate-usdfc-per-epoch",
320        type=int,
321        default=DEFAULT_PAYMENT_RATE_USDFC_PER_EPOCH,
322        help="simulated Filecoin Pay rate per epoch",
323    )
324    parser.add_argument(
325        "--lockup-epochs",
326        type=int,
327        default=DEFAULT_LOCKUP_EPOCHS,
328        help="simulated Filecoin Pay lockup period in epochs",
329    )
330    parser.add_argument(
331        "-s",
332        "--seed",
333        type=int,
334        help="seed the RNG to make peer IDs reproducible",
335    )
336    args = parser.parse_args()
337
338    try:
339        trio.run(
340            run,
341            args.port,
342            args.destination,
343            args.name,
344            args.size,
345            args.copies,
346            args.with_cdn,
347            args.payer,
348            args.max_lockup_usdfc,
349            args.payment_rate_usdfc_per_epoch,
350            args.lockup_epochs,
351            args.seed,
352        )
353    except BaseException as exc:
354        if _is_keyboard_interrupt_exit(exc):
355            return
356        raise
357
358
359def _is_keyboard_interrupt_exit(exc: BaseException) -> bool:
360    if isinstance(exc, KeyboardInterrupt):
361        return True
362
363    nested = getattr(exc, "exceptions", None)
364    if not isinstance(nested, tuple) or not nested:
365        return False
366
367    return all(_is_keyboard_interrupt_exit(child) for child in nested)
368
369
370if __name__ == "__main__":
371    main()