A2A HTTP Payment Demo
This example exposes the payment demo through the official A2A Python SDK over HTTP JSON-RPC with Server-Sent Events (SSE) for streaming updates.
It keeps the same task semantics as the libp2p demo while adding a standards- facing surface:
GET /.well-known/agent-card.jsonfor discoveryPOST /rpcwithSendMessage/GetTask/CancelTask/ListTasksSendStreamingMessageover SSE for real-time task progress and artifact updates
The HTTP facade can run in two backend modes:
simulation: safe local mode, no external services requiredsynapse: real Synapse/Filecoin execution through a Node sidecar
Python requirements
Install the optional Python packages needed for the HTTP facade:
$ pip install "a2a-sdk[http-server]" uvicorn
Node sidecar requirements
Install the sidecar dependencies once:
$ cd examples/request_response/synapse_sidecar
$ npm install
Simulation mode
Run the official A2A server locally in simulation mode:
$ a2a-http-payment-demo --backend simulation --host 127.0.0.1 --port 9999
The Agent Card will advertise http://127.0.0.1:9999/rpc as the preferred
JSON-RPC interface.
Synapse-backed mode
To enable real Synapse/Filecoin execution, configure a funded testnet wallet and run the server with the Synapse backend:
$ export A2A_SYNAPSE_PRIVATE_KEY=0x...
$ export A2A_SYNAPSE_NETWORK=calibration
$ export A2A_SYNAPSE_EXECUTE_TRANSACTIONS=1
$ a2a-http-payment-demo --backend synapse --host 127.0.0.1 --port 9999
Important environment variables:
A2A_SYNAPSE_PRIVATE_KEY: private key used by the Node sidecarA2A_SYNAPSE_NETWORK:calibrationormainnet(default:calibration)A2A_SYNAPSE_RPC_URL: optional custom Filecoin RPC endpointA2A_SYNAPSE_PANDORA_ADDRESS: optional Pandora contract overrideA2A_SYNAPSE_EXECUTE_TRANSACTIONS: set to1to allow real funding and approval transactionsA2A_SYNAPSE_VERIFY_DOWNLOAD: set to0to skip retrieval verification
The Synapse bridge is intentionally conservative. If the wallet needs funding or
service approval and A2A_SYNAPSE_EXECUTE_TRANSACTIONS is not enabled, the
task fails with an explicit instruction instead of spending automatically.
What the real backend adds
When the Synapse backend is enabled, the demo stops being purely simulated:
the quote path inspects the wallet, payment account, and allowance requirements
the store path can perform deposit and service-approval transactions
the storage receipt includes real
CommP/ piece identifiers, proof set data, provider address, and retrieval verification metadata
The full source code for the HTTP facade is below:
1from __future__ import annotations
2
3import argparse
4from collections.abc import AsyncGenerator, Mapping
5import importlib
6from typing import TYPE_CHECKING, Any, cast
7
8from .a2a_payment_service import (
9 A2APaymentTaskService,
10 extract_payload,
11 validate_payment_authorization,
12)
13from .synapse_bridge import SynapseNodeBridgeBackend
14
15A2A_HTTP_DEPS_AVAILABLE = True
16
17if TYPE_CHECKING:
18
19 class _ProtoField:
20 def CopyFrom(self, other: Any) -> None: ...
21
22 class _ProtoMessage:
23 def __init__(self, *args: Any, **kwargs: Any) -> None: ...
24
25 class _TaskState:
26 TASK_STATE_UNSPECIFIED = 0
27
28 @staticmethod
29 def Name(value: object) -> str: ...
30
31 class _GetTaskRequest:
32 id: str
33
34 class _CancelTaskRequest:
35 id: str
36
37 class _ListTasksRequest:
38 status: int
39 context_id: str
40 include_artifacts: bool
41
42 def HasField(self, field_name: str) -> bool: ...
43
44 class _SendMessageRequest:
45 message: Any
46
47 class _SubscribeToTaskRequest:
48 id: str
49
50 class _TaskArtifactUpdateEvent(_ProtoMessage):
51 artifact: _ProtoField
52 last_chunk: bool
53
54 class _TaskStatusUpdateEvent(_ProtoMessage):
55 status: _ProtoField
56
57 class _Starlette:
58 def __init__(self, *args: Any, **kwargs: Any) -> None: ...
59
60 ServerCallContext = Any
61 Event = Any
62 RequestHandler = object
63 AgentCard = _ProtoMessage
64 Artifact = _ProtoMessage
65 CancelTaskRequest = _CancelTaskRequest
66 DeleteTaskPushNotificationConfigRequest = _ProtoMessage
67 GetExtendedAgentCardRequest = _ProtoMessage
68 GetTaskPushNotificationConfigRequest = _ProtoMessage
69 GetTaskRequest = _GetTaskRequest
70 ListTaskPushNotificationConfigsRequest = _ProtoMessage
71 ListTaskPushNotificationConfigsResponse = _ProtoMessage
72 ListTasksRequest = _ListTasksRequest
73 ListTasksResponse = _ProtoMessage
74 Message = _ProtoMessage
75 SendMessageRequest = _SendMessageRequest
76 SubscribeToTaskRequest = _SubscribeToTaskRequest
77 Task = _ProtoMessage
78 TaskArtifactUpdateEvent = _TaskArtifactUpdateEvent
79 TaskPushNotificationConfig = _ProtoMessage
80 TaskState = _TaskState
81 TaskStatusUpdateEvent = _TaskStatusUpdateEvent
82 Starlette = _Starlette
83
84 class InvalidParamsError(RuntimeError):
85 def __init__(self, *, message: str | None = None) -> None: ...
86
87 class PushNotificationNotSupportedError(RuntimeError):
88 def __init__(self, *, message: str | None = None) -> None: ...
89
90 class TaskNotCancelableError(RuntimeError):
91 def __init__(self, *, message: str | None = None) -> None: ...
92
93 class TaskNotFoundError(RuntimeError):
94 def __init__(self, *, message: str | None = None) -> None: ...
95
96 class UnsupportedOperationError(RuntimeError):
97 def __init__(self, *, message: str | None = None) -> None: ...
98
99 def create_agent_card_routes(*args: Any, **kwargs: Any) -> list[Any]: ...
100
101 def create_jsonrpc_routes(*args: Any, **kwargs: Any) -> list[Any]: ...
102
103 def MessageToDict(*args: Any, **kwargs: Any) -> dict[str, object]: ...
104
105 def ParseDict(*args: Any, **kwargs: Any) -> Any: ...
106else:
107 try:
108 from a2a.server.context import ServerCallContext
109 from a2a.server.events import Event
110 from a2a.server.request_handlers.request_handler import RequestHandler
111 from a2a.server.routes import create_agent_card_routes, create_jsonrpc_routes
112 from a2a.types.a2a_pb2 import (
113 AgentCard,
114 Artifact,
115 CancelTaskRequest,
116 DeleteTaskPushNotificationConfigRequest,
117 GetExtendedAgentCardRequest,
118 GetTaskPushNotificationConfigRequest,
119 GetTaskRequest,
120 ListTaskPushNotificationConfigsRequest,
121 ListTaskPushNotificationConfigsResponse,
122 ListTasksRequest,
123 ListTasksResponse,
124 Message,
125 SendMessageRequest,
126 SubscribeToTaskRequest,
127 Task,
128 TaskArtifactUpdateEvent,
129 TaskPushNotificationConfig,
130 TaskState,
131 TaskStatusUpdateEvent,
132 )
133 from a2a.utils.errors import (
134 InvalidParamsError,
135 PushNotificationNotSupportedError,
136 TaskNotCancelableError,
137 TaskNotFoundError,
138 UnsupportedOperationError,
139 )
140 from google.protobuf.json_format import MessageToDict, ParseDict
141 from starlette.applications import Starlette
142 except ImportError: # pragma: no cover - optional runtime dependency
143 A2A_HTTP_DEPS_AVAILABLE = False
144 ServerCallContext = cast(Any, object)
145 Event = cast(Any, object)
146
147 class RequestHandler:
148 pass
149
150 def create_agent_card_routes(*args: Any, **kwargs: Any) -> Any:
151 raise RuntimeError("A2A HTTP dependencies are not installed")
152
153 def create_jsonrpc_routes(*args: Any, **kwargs: Any) -> Any:
154 raise RuntimeError("A2A HTTP dependencies are not installed")
155
156 AgentCard = cast(Any, None)
157 Artifact = cast(Any, None)
158 CancelTaskRequest = cast(Any, None)
159 DeleteTaskPushNotificationConfigRequest = cast(Any, None)
160 GetExtendedAgentCardRequest = cast(Any, None)
161 GetTaskPushNotificationConfigRequest = cast(Any, None)
162 GetTaskRequest = cast(Any, None)
163 ListTaskPushNotificationConfigsRequest = cast(Any, None)
164 ListTaskPushNotificationConfigsResponse = cast(Any, None)
165 ListTasksRequest = cast(Any, None)
166 ListTasksResponse = cast(Any, None)
167 Message = cast(Any, None)
168 SendMessageRequest = cast(Any, None)
169 SubscribeToTaskRequest = cast(Any, None)
170 Task = cast(Any, None)
171 TaskArtifactUpdateEvent = cast(Any, None)
172 TaskPushNotificationConfig = cast(Any, None)
173 TaskState = cast(Any, None)
174 TaskStatusUpdateEvent = cast(Any, None)
175
176 class _OptionalA2AError(RuntimeError):
177 def __init__(self, *, message: str | None = None) -> None:
178 super().__init__(message or "A2A HTTP dependencies are not installed")
179
180 class InvalidParamsError(_OptionalA2AError):
181 pass
182
183 class PushNotificationNotSupportedError(_OptionalA2AError):
184 pass
185
186 class TaskNotCancelableError(_OptionalA2AError):
187 pass
188
189 class TaskNotFoundError(_OptionalA2AError):
190 pass
191
192 class UnsupportedOperationError(_OptionalA2AError):
193 pass
194
195 def MessageToDict(*args: Any, **kwargs: Any) -> Any:
196 raise RuntimeError("A2A HTTP dependencies are not installed")
197
198 def ParseDict(*args: Any, **kwargs: Any) -> Any:
199 raise RuntimeError("A2A HTTP dependencies are not installed")
200
201 Starlette = cast(Any, None)
202
203
204def _require_a2a_sdk() -> None:
205 if not A2A_HTTP_DEPS_AVAILABLE or AgentCard is None or Starlette is None:
206 raise RuntimeError(
207 "This demo requires the optional A2A HTTP dependencies. "
208 "Install `a2a-sdk[http-server]` and `uvicorn` to run it."
209 )
210
211
212def _status_dict(task: Mapping[str, object]) -> Mapping[str, object]:
213 status = task.get("status")
214 if isinstance(status, Mapping):
215 return status
216 return {}
217
218
219class A2APaymentRequestHandler(RequestHandler):
220 def __init__(self, service: A2APaymentTaskService) -> None:
221 self._service = service
222
223 async def on_get_task(
224 self,
225 params: GetTaskRequest,
226 context: ServerCallContext,
227 ) -> Task | None:
228 del context
229 task = self._service.get_task(params.id)
230 if task is None:
231 return None
232 return _parse_proto(task, Task)
233
234 async def on_list_tasks(
235 self, params: ListTasksRequest, context: ServerCallContext
236 ) -> ListTasksResponse:
237 del context
238 state = (
239 TaskState.Name(params.status)
240 if params.status != TaskState.TASK_STATE_UNSPECIFIED
241 else None
242 )
243 context_id = params.context_id or None
244 tasks = self._service.list_tasks(
245 context_id=context_id,
246 state=state,
247 include_artifacts=params.include_artifacts
248 if params.HasField("include_artifacts")
249 else False,
250 )
251 return ListTasksResponse(
252 tasks=[_parse_proto(task, Task) for task in tasks],
253 next_page_token="",
254 page_size=len(tasks),
255 total_size=len(tasks),
256 )
257
258 async def on_cancel_task(
259 self,
260 params: CancelTaskRequest,
261 context: ServerCallContext,
262 ) -> Task | None:
263 del context
264 try:
265 task = self._service.cancel_task(params.id)
266 except ValueError as exc:
267 raise TaskNotCancelableError(message=str(exc)) from exc
268 if task is None:
269 return None
270 return _parse_proto(task, Task)
271
272 async def on_message_send(
273 self,
274 params: SendMessageRequest,
275 context: ServerCallContext,
276 ) -> Task | Message:
277 del context
278 message = MessageToDict(params.message, preserving_proto_field_name=False)
279 try:
280 task = self._service.send_message(message)
281 except KeyError as exc:
282 raise TaskNotFoundError from exc
283 except RuntimeError as exc:
284 raise UnsupportedOperationError(message=str(exc)) from exc
285 except ValueError as exc:
286 raise InvalidParamsError(message=str(exc)) from exc
287 return _parse_proto(task, Task)
288
289 async def on_message_send_stream(
290 self,
291 params: SendMessageRequest,
292 context: ServerCallContext,
293 ) -> AsyncGenerator[Event]:
294 del context
295 message = MessageToDict(params.message, preserving_proto_field_name=False)
296 task_id = message.get("taskId")
297 payload = extract_payload(message)
298
299 if not isinstance(task_id, str):
300 try:
301 task = self._service.send_message(message)
302 except ValueError as exc:
303 raise InvalidParamsError(message=str(exc)) from exc
304 except RuntimeError as exc:
305 raise UnsupportedOperationError(message=str(exc)) from exc
306 yield _parse_proto(task, Task)
307 return
308
309 current_task = self._service.get_task(task_id)
310 if current_task is None:
311 raise TaskNotFoundError
312
313 if isinstance(payload, Mapping):
314 payment_authorization = payload.get("paymentAuthorization")
315 else:
316 payment_authorization = None
317
318 if isinstance(payment_authorization, Mapping):
319 metadata = current_task.get("metadata", {})
320 quote = metadata.get("quote") if isinstance(metadata, Mapping) else None
321 if isinstance(quote, Mapping):
322 errors = validate_payment_authorization(payment_authorization, quote)
323 else:
324 errors = ["quoted task metadata is missing"]
325
326 if not errors:
327 working = self._service.build_streaming_working_task(
328 task_id=task_id,
329 context_id=str(current_task["contextId"]),
330 message="Payment authorization accepted. Executing storage task.",
331 )
332 yield _parse_proto(working, Task)
333 self._service.send_message(message)
334 final_task = self._service.complete_working_task(task_id)
335 if final_task is None:
336 raise TaskNotFoundError
337 artifacts = final_task.get("artifacts")
338 if isinstance(artifacts, list):
339 for artifact in artifacts:
340 if not isinstance(artifact, Mapping):
341 continue
342 yield _build_artifact_update(
343 task_id=task_id,
344 context_id=str(final_task["contextId"]),
345 artifact_dict=artifact,
346 )
347 yield _build_status_update(
348 task_id=task_id,
349 context_id=str(final_task["contextId"]),
350 task_dict=final_task,
351 )
352 return
353
354 try:
355 task = self._service.send_message(message)
356 except ValueError as exc:
357 raise InvalidParamsError(message=str(exc)) from exc
358 except RuntimeError as exc:
359 raise UnsupportedOperationError(message=str(exc)) from exc
360 yield _parse_proto(task, Task)
361
362 async def on_create_task_push_notification_config(
363 self,
364 params: TaskPushNotificationConfig,
365 context: ServerCallContext,
366 ) -> TaskPushNotificationConfig:
367 del params, context
368 raise PushNotificationNotSupportedError
369
370 async def on_get_task_push_notification_config(
371 self,
372 params: GetTaskPushNotificationConfigRequest,
373 context: ServerCallContext,
374 ) -> TaskPushNotificationConfig:
375 del params, context
376 raise PushNotificationNotSupportedError
377
378 async def on_subscribe_to_task(
379 self,
380 params: SubscribeToTaskRequest,
381 context: ServerCallContext,
382 ) -> AsyncGenerator[Event]:
383 del context
384 task = self._service.get_task(params.id)
385 if task is None:
386 raise TaskNotFoundError
387 state = str(_status_dict(task).get("state", ""))
388 if state in {
389 "TASK_STATE_COMPLETED",
390 "TASK_STATE_FAILED",
391 "TASK_STATE_CANCELED",
392 "TASK_STATE_REJECTED",
393 }:
394 raise UnsupportedOperationError(
395 message="The task is in a terminal state and cannot be subscribed to"
396 )
397 yield _parse_proto(task, Task)
398
399 async def on_list_task_push_notification_configs(
400 self,
401 params: ListTaskPushNotificationConfigsRequest,
402 context: ServerCallContext,
403 ) -> ListTaskPushNotificationConfigsResponse:
404 del params, context
405 raise PushNotificationNotSupportedError
406
407 async def on_delete_task_push_notification_config(
408 self,
409 params: DeleteTaskPushNotificationConfigRequest,
410 context: ServerCallContext,
411 ) -> None:
412 del params, context
413 raise PushNotificationNotSupportedError
414
415 async def on_get_extended_agent_card(
416 self,
417 params: GetExtendedAgentCardRequest,
418 context: ServerCallContext,
419 ) -> Any:
420 del params, context
421 raise UnsupportedOperationError(
422 message="This demo does not provide an extended agent card"
423 )
424
425
426def build_task_service(backend: str) -> A2APaymentTaskService:
427 if backend == "simulation":
428 return A2APaymentTaskService()
429 if backend == "synapse":
430 return A2APaymentTaskService(
431 execution_backend=SynapseNodeBridgeBackend.from_env()
432 )
433 raise ValueError(f"Unsupported backend: {backend}")
434
435
436def create_a2a_http_app(
437 *,
438 backend: str = "simulation",
439 public_base_url: str = "http://127.0.0.1:9999",
440 rpc_path: str = "/rpc",
441) -> Starlette:
442 _require_a2a_sdk()
443 service = build_task_service(backend)
444 handler = A2APaymentRequestHandler(service)
445 agent_card_dict = service.build_agent_card(
446 interface_url=f"{public_base_url}{rpc_path}",
447 protocol_binding="JSONRPC",
448 streaming=True,
449 )
450 agent_card = _parse_proto(agent_card_dict, AgentCard)
451 routes = [
452 *create_agent_card_routes(agent_card),
453 *create_jsonrpc_routes(handler, rpc_path),
454 ]
455 return cast(Any, Starlette)(routes=routes)
456
457
458def _parse_proto(payload: Mapping[str, object], proto_cls: Any) -> Any:
459 return ParseDict(dict(payload), proto_cls())
460
461
462def _build_artifact_update(
463 *, task_id: str, context_id: str, artifact_dict: Mapping[str, object]
464) -> TaskArtifactUpdateEvent:
465 artifact = _parse_proto(artifact_dict, Artifact)
466 event = TaskArtifactUpdateEvent(task_id=task_id, context_id=context_id)
467 event.artifact.CopyFrom(artifact)
468 event.last_chunk = True
469 return event
470
471
472def _build_status_update(
473 *, task_id: str, context_id: str, task_dict: Mapping[str, object]
474) -> TaskStatusUpdateEvent:
475 task = _parse_proto(task_dict, Task)
476 event = TaskStatusUpdateEvent(task_id=task_id, context_id=context_id)
477 event.status.CopyFrom(task.status)
478 return event
479
480
481def main() -> None:
482 _require_a2a_sdk()
483 parser = argparse.ArgumentParser(
484 description=(
485 "Runs the A2A HTTP/JSON-RPC + SSE facade for the Filecoin payment demo. "
486 "Use --backend synapse for real Synapse-backed execution."
487 )
488 )
489 parser.add_argument("--host", default="127.0.0.1")
490 parser.add_argument("--port", default=9999, type=int)
491 parser.add_argument(
492 "--backend",
493 choices=("simulation", "synapse"),
494 default="simulation",
495 help="execution backend for storage tasks",
496 )
497 parser.add_argument(
498 "--public-base-url",
499 default=None,
500 help="base URL to advertise in the Agent Card",
501 )
502 parser.add_argument(
503 "--rpc-path",
504 default="/rpc",
505 help="JSON-RPC endpoint path exposed by the A2A server",
506 )
507 args = parser.parse_args()
508
509 public_base_url = args.public_base_url or f"http://{args.host}:{args.port}"
510 app = create_a2a_http_app(
511 backend=args.backend,
512 public_base_url=public_base_url,
513 rpc_path=args.rpc_path,
514 )
515
516 try:
517 uvicorn = importlib.import_module("uvicorn")
518 except ImportError as exc: # pragma: no cover - optional runtime dependency
519 raise SystemExit("Install `uvicorn` to run the HTTP A2A demo server.") from exc
520
521 uvicorn.run(app, host=args.host, port=args.port)
522
523
524if __name__ == "__main__": # pragma: no cover
525 main()