A2A HTTP Payment Demo

This example exposes the payment demo through the official A2A Python SDK over HTTP JSON-RPC with Server-Sent Events (SSE) for streaming updates.

It keeps the same task semantics as the libp2p demo while adding a standards- facing surface:

  • GET /.well-known/agent-card.json for discovery

  • POST /rpc with SendMessage / GetTask / CancelTask / ListTasks

  • SendStreamingMessage over SSE for real-time task progress and artifact updates

The HTTP facade can run in two backend modes:

  • simulation: safe local mode, no external services required

  • synapse: real Synapse/Filecoin execution through a Node sidecar

Python requirements

Install the optional Python packages needed for the HTTP facade:

$ pip install "a2a-sdk[http-server]" uvicorn

Node sidecar requirements

Install the sidecar dependencies once:

$ cd examples/request_response/synapse_sidecar
$ npm install

Simulation mode

Run the official A2A server locally in simulation mode:

$ a2a-http-payment-demo --backend simulation --host 127.0.0.1 --port 9999

The Agent Card will advertise http://127.0.0.1:9999/rpc as the preferred JSON-RPC interface.

Synapse-backed mode

To enable real Synapse/Filecoin execution, configure a funded testnet wallet and run the server with the Synapse backend:

$ export A2A_SYNAPSE_PRIVATE_KEY=0x...
$ export A2A_SYNAPSE_NETWORK=calibration
$ export A2A_SYNAPSE_EXECUTE_TRANSACTIONS=1
$ a2a-http-payment-demo --backend synapse --host 127.0.0.1 --port 9999

Important environment variables:

  • A2A_SYNAPSE_PRIVATE_KEY: private key used by the Node sidecar

  • A2A_SYNAPSE_NETWORK: calibration or mainnet (default: calibration)

  • A2A_SYNAPSE_RPC_URL: optional custom Filecoin RPC endpoint

  • A2A_SYNAPSE_PANDORA_ADDRESS: optional Pandora contract override

  • A2A_SYNAPSE_EXECUTE_TRANSACTIONS: set to 1 to allow real funding and approval transactions

  • A2A_SYNAPSE_VERIFY_DOWNLOAD: set to 0 to skip retrieval verification

The Synapse bridge is intentionally conservative. If the wallet needs funding or service approval and A2A_SYNAPSE_EXECUTE_TRANSACTIONS is not enabled, the task fails with an explicit instruction instead of spending automatically.

What the real backend adds

When the Synapse backend is enabled, the demo stops being purely simulated:

  • the quote path inspects the wallet, payment account, and allowance requirements

  • the store path can perform deposit and service-approval transactions

  • the storage receipt includes real CommP / piece identifiers, proof set data, provider address, and retrieval verification metadata

The full source code for the HTTP facade is below:

  1from __future__ import annotations
  2
  3import argparse
  4from collections.abc import AsyncGenerator, Mapping
  5import importlib
  6from typing import TYPE_CHECKING, Any, cast
  7
  8from .a2a_payment_service import (
  9    A2APaymentTaskService,
 10    extract_payload,
 11    validate_payment_authorization,
 12)
 13from .synapse_bridge import SynapseNodeBridgeBackend
 14
 15A2A_HTTP_DEPS_AVAILABLE = True
 16
 17if TYPE_CHECKING:
 18
 19    class _ProtoField:
 20        def CopyFrom(self, other: Any) -> None: ...
 21
 22    class _ProtoMessage:
 23        def __init__(self, *args: Any, **kwargs: Any) -> None: ...
 24
 25    class _TaskState:
 26        TASK_STATE_UNSPECIFIED = 0
 27
 28        @staticmethod
 29        def Name(value: object) -> str: ...
 30
 31    class _GetTaskRequest:
 32        id: str
 33
 34    class _CancelTaskRequest:
 35        id: str
 36
 37    class _ListTasksRequest:
 38        status: int
 39        context_id: str
 40        include_artifacts: bool
 41
 42        def HasField(self, field_name: str) -> bool: ...
 43
 44    class _SendMessageRequest:
 45        message: Any
 46
 47    class _SubscribeToTaskRequest:
 48        id: str
 49
 50    class _TaskArtifactUpdateEvent(_ProtoMessage):
 51        artifact: _ProtoField
 52        last_chunk: bool
 53
 54    class _TaskStatusUpdateEvent(_ProtoMessage):
 55        status: _ProtoField
 56
 57    class _Starlette:
 58        def __init__(self, *args: Any, **kwargs: Any) -> None: ...
 59
 60    ServerCallContext = Any
 61    Event = Any
 62    RequestHandler = object
 63    AgentCard = _ProtoMessage
 64    Artifact = _ProtoMessage
 65    CancelTaskRequest = _CancelTaskRequest
 66    DeleteTaskPushNotificationConfigRequest = _ProtoMessage
 67    GetExtendedAgentCardRequest = _ProtoMessage
 68    GetTaskPushNotificationConfigRequest = _ProtoMessage
 69    GetTaskRequest = _GetTaskRequest
 70    ListTaskPushNotificationConfigsRequest = _ProtoMessage
 71    ListTaskPushNotificationConfigsResponse = _ProtoMessage
 72    ListTasksRequest = _ListTasksRequest
 73    ListTasksResponse = _ProtoMessage
 74    Message = _ProtoMessage
 75    SendMessageRequest = _SendMessageRequest
 76    SubscribeToTaskRequest = _SubscribeToTaskRequest
 77    Task = _ProtoMessage
 78    TaskArtifactUpdateEvent = _TaskArtifactUpdateEvent
 79    TaskPushNotificationConfig = _ProtoMessage
 80    TaskState = _TaskState
 81    TaskStatusUpdateEvent = _TaskStatusUpdateEvent
 82    Starlette = _Starlette
 83
 84    class InvalidParamsError(RuntimeError):
 85        def __init__(self, *, message: str | None = None) -> None: ...
 86
 87    class PushNotificationNotSupportedError(RuntimeError):
 88        def __init__(self, *, message: str | None = None) -> None: ...
 89
 90    class TaskNotCancelableError(RuntimeError):
 91        def __init__(self, *, message: str | None = None) -> None: ...
 92
 93    class TaskNotFoundError(RuntimeError):
 94        def __init__(self, *, message: str | None = None) -> None: ...
 95
 96    class UnsupportedOperationError(RuntimeError):
 97        def __init__(self, *, message: str | None = None) -> None: ...
 98
 99    def create_agent_card_routes(*args: Any, **kwargs: Any) -> list[Any]: ...
100
101    def create_jsonrpc_routes(*args: Any, **kwargs: Any) -> list[Any]: ...
102
103    def MessageToDict(*args: Any, **kwargs: Any) -> dict[str, object]: ...
104
105    def ParseDict(*args: Any, **kwargs: Any) -> Any: ...
106else:
107    try:
108        from a2a.server.context import ServerCallContext
109        from a2a.server.events import Event
110        from a2a.server.request_handlers.request_handler import RequestHandler
111        from a2a.server.routes import create_agent_card_routes, create_jsonrpc_routes
112        from a2a.types.a2a_pb2 import (
113            AgentCard,
114            Artifact,
115            CancelTaskRequest,
116            DeleteTaskPushNotificationConfigRequest,
117            GetExtendedAgentCardRequest,
118            GetTaskPushNotificationConfigRequest,
119            GetTaskRequest,
120            ListTaskPushNotificationConfigsRequest,
121            ListTaskPushNotificationConfigsResponse,
122            ListTasksRequest,
123            ListTasksResponse,
124            Message,
125            SendMessageRequest,
126            SubscribeToTaskRequest,
127            Task,
128            TaskArtifactUpdateEvent,
129            TaskPushNotificationConfig,
130            TaskState,
131            TaskStatusUpdateEvent,
132        )
133        from a2a.utils.errors import (
134            InvalidParamsError,
135            PushNotificationNotSupportedError,
136            TaskNotCancelableError,
137            TaskNotFoundError,
138            UnsupportedOperationError,
139        )
140        from google.protobuf.json_format import MessageToDict, ParseDict
141        from starlette.applications import Starlette
142    except ImportError:  # pragma: no cover - optional runtime dependency
143        A2A_HTTP_DEPS_AVAILABLE = False
144        ServerCallContext = cast(Any, object)
145        Event = cast(Any, object)
146
147        class RequestHandler:
148            pass
149
150        def create_agent_card_routes(*args: Any, **kwargs: Any) -> Any:
151            raise RuntimeError("A2A HTTP dependencies are not installed")
152
153        def create_jsonrpc_routes(*args: Any, **kwargs: Any) -> Any:
154            raise RuntimeError("A2A HTTP dependencies are not installed")
155
156        AgentCard = cast(Any, None)
157        Artifact = cast(Any, None)
158        CancelTaskRequest = cast(Any, None)
159        DeleteTaskPushNotificationConfigRequest = cast(Any, None)
160        GetExtendedAgentCardRequest = cast(Any, None)
161        GetTaskPushNotificationConfigRequest = cast(Any, None)
162        GetTaskRequest = cast(Any, None)
163        ListTaskPushNotificationConfigsRequest = cast(Any, None)
164        ListTaskPushNotificationConfigsResponse = cast(Any, None)
165        ListTasksRequest = cast(Any, None)
166        ListTasksResponse = cast(Any, None)
167        Message = cast(Any, None)
168        SendMessageRequest = cast(Any, None)
169        SubscribeToTaskRequest = cast(Any, None)
170        Task = cast(Any, None)
171        TaskArtifactUpdateEvent = cast(Any, None)
172        TaskPushNotificationConfig = cast(Any, None)
173        TaskState = cast(Any, None)
174        TaskStatusUpdateEvent = cast(Any, None)
175
176        class _OptionalA2AError(RuntimeError):
177            def __init__(self, *, message: str | None = None) -> None:
178                super().__init__(message or "A2A HTTP dependencies are not installed")
179
180        class InvalidParamsError(_OptionalA2AError):
181            pass
182
183        class PushNotificationNotSupportedError(_OptionalA2AError):
184            pass
185
186        class TaskNotCancelableError(_OptionalA2AError):
187            pass
188
189        class TaskNotFoundError(_OptionalA2AError):
190            pass
191
192        class UnsupportedOperationError(_OptionalA2AError):
193            pass
194
195        def MessageToDict(*args: Any, **kwargs: Any) -> Any:
196            raise RuntimeError("A2A HTTP dependencies are not installed")
197
198        def ParseDict(*args: Any, **kwargs: Any) -> Any:
199            raise RuntimeError("A2A HTTP dependencies are not installed")
200
201        Starlette = cast(Any, None)
202
203
204def _require_a2a_sdk() -> None:
205    if not A2A_HTTP_DEPS_AVAILABLE or AgentCard is None or Starlette is None:
206        raise RuntimeError(
207            "This demo requires the optional A2A HTTP dependencies. "
208            "Install `a2a-sdk[http-server]` and `uvicorn` to run it."
209        )
210
211
212def _status_dict(task: Mapping[str, object]) -> Mapping[str, object]:
213    status = task.get("status")
214    if isinstance(status, Mapping):
215        return status
216    return {}
217
218
219class A2APaymentRequestHandler(RequestHandler):
220    def __init__(self, service: A2APaymentTaskService) -> None:
221        self._service = service
222
223    async def on_get_task(
224        self,
225        params: GetTaskRequest,
226        context: ServerCallContext,
227    ) -> Task | None:
228        del context
229        task = self._service.get_task(params.id)
230        if task is None:
231            return None
232        return _parse_proto(task, Task)
233
234    async def on_list_tasks(
235        self, params: ListTasksRequest, context: ServerCallContext
236    ) -> ListTasksResponse:
237        del context
238        state = (
239            TaskState.Name(params.status)
240            if params.status != TaskState.TASK_STATE_UNSPECIFIED
241            else None
242        )
243        context_id = params.context_id or None
244        tasks = self._service.list_tasks(
245            context_id=context_id,
246            state=state,
247            include_artifacts=params.include_artifacts
248            if params.HasField("include_artifacts")
249            else False,
250        )
251        return ListTasksResponse(
252            tasks=[_parse_proto(task, Task) for task in tasks],
253            next_page_token="",
254            page_size=len(tasks),
255            total_size=len(tasks),
256        )
257
258    async def on_cancel_task(
259        self,
260        params: CancelTaskRequest,
261        context: ServerCallContext,
262    ) -> Task | None:
263        del context
264        try:
265            task = self._service.cancel_task(params.id)
266        except ValueError as exc:
267            raise TaskNotCancelableError(message=str(exc)) from exc
268        if task is None:
269            return None
270        return _parse_proto(task, Task)
271
272    async def on_message_send(
273        self,
274        params: SendMessageRequest,
275        context: ServerCallContext,
276    ) -> Task | Message:
277        del context
278        message = MessageToDict(params.message, preserving_proto_field_name=False)
279        try:
280            task = self._service.send_message(message)
281        except KeyError as exc:
282            raise TaskNotFoundError from exc
283        except RuntimeError as exc:
284            raise UnsupportedOperationError(message=str(exc)) from exc
285        except ValueError as exc:
286            raise InvalidParamsError(message=str(exc)) from exc
287        return _parse_proto(task, Task)
288
289    async def on_message_send_stream(
290        self,
291        params: SendMessageRequest,
292        context: ServerCallContext,
293    ) -> AsyncGenerator[Event]:
294        del context
295        message = MessageToDict(params.message, preserving_proto_field_name=False)
296        task_id = message.get("taskId")
297        payload = extract_payload(message)
298
299        if not isinstance(task_id, str):
300            try:
301                task = self._service.send_message(message)
302            except ValueError as exc:
303                raise InvalidParamsError(message=str(exc)) from exc
304            except RuntimeError as exc:
305                raise UnsupportedOperationError(message=str(exc)) from exc
306            yield _parse_proto(task, Task)
307            return
308
309        current_task = self._service.get_task(task_id)
310        if current_task is None:
311            raise TaskNotFoundError
312
313        if isinstance(payload, Mapping):
314            payment_authorization = payload.get("paymentAuthorization")
315        else:
316            payment_authorization = None
317
318        if isinstance(payment_authorization, Mapping):
319            metadata = current_task.get("metadata", {})
320            quote = metadata.get("quote") if isinstance(metadata, Mapping) else None
321            if isinstance(quote, Mapping):
322                errors = validate_payment_authorization(payment_authorization, quote)
323            else:
324                errors = ["quoted task metadata is missing"]
325
326            if not errors:
327                working = self._service.build_streaming_working_task(
328                    task_id=task_id,
329                    context_id=str(current_task["contextId"]),
330                    message="Payment authorization accepted. Executing storage task.",
331                )
332                yield _parse_proto(working, Task)
333                self._service.send_message(message)
334                final_task = self._service.complete_working_task(task_id)
335                if final_task is None:
336                    raise TaskNotFoundError
337                artifacts = final_task.get("artifacts")
338                if isinstance(artifacts, list):
339                    for artifact in artifacts:
340                        if not isinstance(artifact, Mapping):
341                            continue
342                        yield _build_artifact_update(
343                            task_id=task_id,
344                            context_id=str(final_task["contextId"]),
345                            artifact_dict=artifact,
346                        )
347                yield _build_status_update(
348                    task_id=task_id,
349                    context_id=str(final_task["contextId"]),
350                    task_dict=final_task,
351                )
352                return
353
354        try:
355            task = self._service.send_message(message)
356        except ValueError as exc:
357            raise InvalidParamsError(message=str(exc)) from exc
358        except RuntimeError as exc:
359            raise UnsupportedOperationError(message=str(exc)) from exc
360        yield _parse_proto(task, Task)
361
362    async def on_create_task_push_notification_config(
363        self,
364        params: TaskPushNotificationConfig,
365        context: ServerCallContext,
366    ) -> TaskPushNotificationConfig:
367        del params, context
368        raise PushNotificationNotSupportedError
369
370    async def on_get_task_push_notification_config(
371        self,
372        params: GetTaskPushNotificationConfigRequest,
373        context: ServerCallContext,
374    ) -> TaskPushNotificationConfig:
375        del params, context
376        raise PushNotificationNotSupportedError
377
378    async def on_subscribe_to_task(
379        self,
380        params: SubscribeToTaskRequest,
381        context: ServerCallContext,
382    ) -> AsyncGenerator[Event]:
383        del context
384        task = self._service.get_task(params.id)
385        if task is None:
386            raise TaskNotFoundError
387        state = str(_status_dict(task).get("state", ""))
388        if state in {
389            "TASK_STATE_COMPLETED",
390            "TASK_STATE_FAILED",
391            "TASK_STATE_CANCELED",
392            "TASK_STATE_REJECTED",
393        }:
394            raise UnsupportedOperationError(
395                message="The task is in a terminal state and cannot be subscribed to"
396            )
397        yield _parse_proto(task, Task)
398
399    async def on_list_task_push_notification_configs(
400        self,
401        params: ListTaskPushNotificationConfigsRequest,
402        context: ServerCallContext,
403    ) -> ListTaskPushNotificationConfigsResponse:
404        del params, context
405        raise PushNotificationNotSupportedError
406
407    async def on_delete_task_push_notification_config(
408        self,
409        params: DeleteTaskPushNotificationConfigRequest,
410        context: ServerCallContext,
411    ) -> None:
412        del params, context
413        raise PushNotificationNotSupportedError
414
415    async def on_get_extended_agent_card(
416        self,
417        params: GetExtendedAgentCardRequest,
418        context: ServerCallContext,
419    ) -> Any:
420        del params, context
421        raise UnsupportedOperationError(
422            message="This demo does not provide an extended agent card"
423        )
424
425
426def build_task_service(backend: str) -> A2APaymentTaskService:
427    if backend == "simulation":
428        return A2APaymentTaskService()
429    if backend == "synapse":
430        return A2APaymentTaskService(
431            execution_backend=SynapseNodeBridgeBackend.from_env()
432        )
433    raise ValueError(f"Unsupported backend: {backend}")
434
435
436def create_a2a_http_app(
437    *,
438    backend: str = "simulation",
439    public_base_url: str = "http://127.0.0.1:9999",
440    rpc_path: str = "/rpc",
441) -> Starlette:
442    _require_a2a_sdk()
443    service = build_task_service(backend)
444    handler = A2APaymentRequestHandler(service)
445    agent_card_dict = service.build_agent_card(
446        interface_url=f"{public_base_url}{rpc_path}",
447        protocol_binding="JSONRPC",
448        streaming=True,
449    )
450    agent_card = _parse_proto(agent_card_dict, AgentCard)
451    routes = [
452        *create_agent_card_routes(agent_card),
453        *create_jsonrpc_routes(handler, rpc_path),
454    ]
455    return cast(Any, Starlette)(routes=routes)
456
457
458def _parse_proto(payload: Mapping[str, object], proto_cls: Any) -> Any:
459    return ParseDict(dict(payload), proto_cls())
460
461
462def _build_artifact_update(
463    *, task_id: str, context_id: str, artifact_dict: Mapping[str, object]
464) -> TaskArtifactUpdateEvent:
465    artifact = _parse_proto(artifact_dict, Artifact)
466    event = TaskArtifactUpdateEvent(task_id=task_id, context_id=context_id)
467    event.artifact.CopyFrom(artifact)
468    event.last_chunk = True
469    return event
470
471
472def _build_status_update(
473    *, task_id: str, context_id: str, task_dict: Mapping[str, object]
474) -> TaskStatusUpdateEvent:
475    task = _parse_proto(task_dict, Task)
476    event = TaskStatusUpdateEvent(task_id=task_id, context_id=context_id)
477    event.status.CopyFrom(task.status)
478    return event
479
480
481def main() -> None:
482    _require_a2a_sdk()
483    parser = argparse.ArgumentParser(
484        description=(
485            "Runs the A2A HTTP/JSON-RPC + SSE facade for the Filecoin payment demo. "
486            "Use --backend synapse for real Synapse-backed execution."
487        )
488    )
489    parser.add_argument("--host", default="127.0.0.1")
490    parser.add_argument("--port", default=9999, type=int)
491    parser.add_argument(
492        "--backend",
493        choices=("simulation", "synapse"),
494        default="simulation",
495        help="execution backend for storage tasks",
496    )
497    parser.add_argument(
498        "--public-base-url",
499        default=None,
500        help="base URL to advertise in the Agent Card",
501    )
502    parser.add_argument(
503        "--rpc-path",
504        default="/rpc",
505        help="JSON-RPC endpoint path exposed by the A2A server",
506    )
507    args = parser.parse_args()
508
509    public_base_url = args.public_base_url or f"http://{args.host}:{args.port}"
510    app = create_a2a_http_app(
511        backend=args.backend,
512        public_base_url=public_base_url,
513        rpc_path=args.rpc_path,
514    )
515
516    try:
517        uvicorn = importlib.import_module("uvicorn")
518    except ImportError as exc:  # pragma: no cover - optional runtime dependency
519        raise SystemExit("Install `uvicorn` to run the HTTP A2A demo server.") from exc
520
521    uvicorn.run(app, host=args.host, port=args.port)
522
523
524if __name__ == "__main__":  # pragma: no cover
525    main()