# -*- coding: utf-8 -*-
"""
A2A Protocol Adapter for FastAPI
This module provides the default A2A (Agent-to-Agent) protocol adapter
implementation for FastAPI applications. It handles agent card configuration,
wellknown endpoint setup, and task management.
"""
import os
import logging
from typing import Any, Callable, Dict, List, Optional, Union
from urllib.parse import urljoin
from a2a.server.apps import A2AFastAPIApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import (
AgentCapabilities,
AgentCard,
AgentSkill,
)
from a2a.utils import AGENT_CARD_WELL_KNOWN_PATH
from fastapi import FastAPI
from pydantic import ConfigDict, BaseModel, field_validator
from agentscope_runtime.engine.deployers.utils.net_utils import (
get_first_non_loopback_ip,
)
from .a2a_agent_adapter import A2AExecutor
from .a2a_registry import (
A2ARegistry,
A2ATransportsProperties,
)
# NOTE: Do NOT import NacosRegistry at module import time to avoid
# forcing an optional dependency on environments that don't have nacos
# SDK installed. Registry is optional: users must explicitly provide a
# registry instance if needed.
# from .nacos_a2a_registry import NacosRegistry
from ..protocol_adapter import ProtocolAdapter
logger = logging.getLogger(__name__)
A2A_JSON_RPC_URL = "/a2a"
DEFAULT_WELLKNOWN_PATH = AGENT_CARD_WELL_KNOWN_PATH
DEFAULT_TASK_TIMEOUT = 60
DEFAULT_TASK_EVENT_TIMEOUT = 10
DEFAULT_TRANSPORT = "JSONRPC"
DEFAULT_INPUT_OUTPUT_MODES = ["text"]
PORT = int(os.getenv("PORT", "8080"))
AGENT_VERSION = "1.0.0"
[docs]
class AgentCardWithRuntimeConfig(BaseModel):
"""Runtime configuration wrapper for AgentCard.
Combines AgentCard (protocol fields) with runtime-specific settings
(host, port, registry, timeouts, etc.) in a single configuration object.
Attributes:
agent_card: AgentCard object or dict containing protocol fields
(name, description, url, version, skills, etc.)
host: Host address for A2A endpoints (default: auto-detected)
port: Port for A2A endpoints (default: from PORT env var or 8080)
registry: List of A2A registry instances for service discovery
task_timeout: Task completion timeout in seconds (default: 60)
task_event_timeout: Task event timeout in seconds (default: 10)
wellknown_path: Wellknown endpoint path
(default: /.wellknown/agent-card.json)
"""
agent_card: Optional[Union[AgentCard, Dict[str, Any]]] = None
host: Optional[str] = None
port: int = PORT
registry: Optional[Union[A2ARegistry, List[A2ARegistry]]] = None
task_timeout: Optional[int] = DEFAULT_TASK_TIMEOUT
task_event_timeout: Optional[int] = DEFAULT_TASK_EVENT_TIMEOUT
wellknown_path: Optional[str] = DEFAULT_WELLKNOWN_PATH
[docs]
@field_validator("registry", mode="before")
@classmethod
def normalize_registry(cls, v):
"""Normalize registry to list format."""
if v is None:
return None
if isinstance(v, list):
return v
# Single registry instance -> convert to list
return [v]
model_config = ConfigDict(
arbitrary_types_allowed=True,
extra="allow",
)
[docs]
class A2AFastAPIDefaultAdapter(ProtocolAdapter):
"""Default A2A protocol adapter for FastAPI applications.
Provides comprehensive configuration options for A2A protocol including
agent card settings, task timeouts, wellknown endpoints, and transport
configurations. All configuration items have sensible defaults but can
be overridden by users.
"""
[docs]
def __init__(
self,
agent_name: str,
agent_description: str,
a2a_config: Optional[AgentCardWithRuntimeConfig] = None,
**kwargs: Any,
) -> None:
"""Initialize A2A protocol adapter.
Args:
agent_name: Agent name
(fallback if not in a2a_config.agent_card)
agent_description: Agent description
(fallback if not in a2a_config.agent_card)
a2a_config: Runtime configuration with AgentCard and runtime
settings
**kwargs: Additional arguments for parent class
"""
super().__init__(**kwargs)
self._json_rpc_path = kwargs.get("json_rpc_path", A2A_JSON_RPC_URL)
if a2a_config is None:
a2a_config = AgentCardWithRuntimeConfig()
self._a2a_config = a2a_config
# Extract name/description from agent_card, fallback to parameters
agent_card_name = None
agent_card_description = None
if a2a_config.agent_card is not None:
if isinstance(a2a_config.agent_card, dict):
agent_card_name = a2a_config.agent_card.get("name")
agent_card_description = a2a_config.agent_card.get(
"description",
)
elif isinstance(a2a_config.agent_card, AgentCard):
agent_card_name = getattr(a2a_config.agent_card, "name", None)
agent_card_description = getattr(
a2a_config.agent_card,
"description",
None,
)
self._agent_name = (
agent_card_name if agent_card_name is not None else agent_name
)
self._agent_description = (
agent_card_description
if agent_card_description is not None
else agent_description
)
self._host = a2a_config.host or get_first_non_loopback_ip()
self._port = a2a_config.port
# Normalize registry to list
registry = a2a_config.registry
if registry is None:
self._registry: List[A2ARegistry] = []
elif isinstance(registry, A2ARegistry):
self._registry = [registry]
elif isinstance(registry, list):
if not all(isinstance(r, A2ARegistry) for r in registry):
error_msg = (
"[A2A] Invalid registry list: all items must be "
"A2ARegistry instances"
)
logger.error(error_msg)
raise TypeError(error_msg)
self._registry = registry
self._task_timeout = a2a_config.task_timeout or DEFAULT_TASK_TIMEOUT
self._task_event_timeout = (
a2a_config.task_event_timeout or DEFAULT_TASK_EVENT_TIMEOUT
)
self._wellknown_path = (
a2a_config.wellknown_path or DEFAULT_WELLKNOWN_PATH
)
[docs]
def add_endpoint(
self,
app: FastAPI,
func: Callable,
**kwargs: Any,
) -> None:
"""Add A2A protocol endpoints to FastAPI application.
Args:
app: FastAPI application instance
func: Agent execution function
**kwargs: Additional arguments for registry registration
"""
request_handler = DefaultRequestHandler(
agent_executor=A2AExecutor(func=func),
task_store=InMemoryTaskStore(),
)
agent_card = self.get_agent_card(app=app)
server = A2AFastAPIApplication(
agent_card=agent_card,
http_handler=request_handler,
)
server.add_routes_to_app(
app,
rpc_url=self._json_rpc_path,
agent_card_url=self._wellknown_path,
)
if self._registry:
self._register_with_all_registries(
agent_card=agent_card,
app=app,
)
def _register_with_all_registries(
self,
agent_card: AgentCard,
app: FastAPI,
) -> None:
"""Register agent with all configured registry instances.
Registration failures are logged but do not block startup.
Args:
agent_card: The generated AgentCard
app: FastAPI application instance
"""
a2a_transports_properties = self._build_a2a_transports_properties(
app=app,
)
for registry in self._registry:
registry_name = registry.registry_name()
try:
logger.info(
"[A2A] Registering with registry: %s",
registry_name,
)
registry.register(
agent_card=agent_card,
a2a_transports_properties=a2a_transports_properties,
)
logger.info(
"[A2A] Successfully registered with registry: %s",
registry_name,
)
except Exception as e:
logger.warning(
"[A2A] Failed to register with registry %s: %s. "
"This will not block runtime startup.",
registry_name,
str(e),
exc_info=True,
)
def _build_a2a_transports_properties(
self,
app: FastAPI,
) -> List[A2ATransportsProperties]:
"""Build A2ATransportsProperties from runtime configuration.
Args:
app: FastAPI application instance
Returns:
List of A2ATransportsProperties instances
"""
transports_list = []
path = getattr(app, "root_path", "")
json_rpc = urljoin(
path.rstrip("/") + "/",
self._json_rpc_path.lstrip("/"),
)
default_transport = A2ATransportsProperties(
host=self._host,
port=self._port,
path=json_rpc,
support_tls=False,
extra={},
transport_type=DEFAULT_TRANSPORT,
)
transports_list.append(default_transport)
return transports_list
def _get_agent_card_field(
self,
field_name: str,
default: Any = None,
) -> Any:
"""Extract field from agent_card (dict or AgentCard object).
Args:
field_name: Field name to retrieve
default: Default value if not found
Returns:
Field value or default
"""
agent_card = self._a2a_config.agent_card
if agent_card is None:
return default
if isinstance(agent_card, dict):
return agent_card.get(field_name, default)
else:
# AgentCard object
return getattr(agent_card, field_name, default)
[docs]
def get_agent_card(
self,
app: Optional[FastAPI] = None, # pylint: disable=unused-argument
) -> AgentCard:
"""Build AgentCard from configuration.
Constructs AgentCard from agent_card field (dict or AgentCard),
filling missing fields with defaults and computed values.
Args:
app: FastAPI app instance (for URL generation)
Returns:
Configured AgentCard instance
"""
# Generate URL if not provided
url = self._get_agent_card_field("url")
if url is None:
path = getattr(app, "root_path", "")
json_rpc = urljoin(
path.rstrip("/") + "/",
self._json_rpc_path.lstrip("/"),
).lstrip("/")
base_url = (
f"{self._host}:{self._port}"
if self._host.startswith(("http://", "https://"))
else f"http://{self._host}:{self._port}"
)
url = f"{base_url}/{json_rpc}"
# Initialize from agent_card
card_kwargs = {}
# Set required fields
card_kwargs["name"] = self._get_agent_card_field(
"name",
self._agent_name,
)
card_kwargs["description"] = self._get_agent_card_field(
"description",
self._agent_description,
)
card_kwargs["url"] = url
card_kwargs["version"] = self._get_agent_card_field(
"version",
AGENT_VERSION,
)
# Set defaults for required fields
card_kwargs["preferred_transport"] = self._get_agent_card_field(
"preferred_transport",
DEFAULT_TRANSPORT,
)
card_kwargs["additional_interfaces"] = self._get_agent_card_field(
"additional_interfaces",
[],
)
card_kwargs["default_input_modes"] = self._get_agent_card_field(
"default_input_modes",
DEFAULT_INPUT_OUTPUT_MODES,
)
card_kwargs["default_output_modes"] = self._get_agent_card_field(
"default_output_modes",
DEFAULT_INPUT_OUTPUT_MODES,
)
card_kwargs["skills"] = self._get_agent_card_field(
"skills",
[
AgentSkill(
id="dialog",
name="Natural Language Dialog Skill",
description=(
"Enables natural language conversation and dialogue "
"with users"
),
tags=["natural language", "dialog", "conversation"],
examples=[
"Hello, how are you?",
"Can you help me with something?",
],
),
],
)
# Runtime-managed AgentCard fields: user values are ignored
if self._get_agent_card_field("capabilities") is not None:
logger.warning(
"[A2A] Ignoring user-provided AgentCard.capabilities; "
"runtime controls this field.",
)
card_kwargs["capabilities"] = AgentCapabilities(
streaming=False,
push_notifications=False,
state_transition_history=False,
)
if self._get_agent_card_field("protocol_version") is not None:
logger.warning(
"[A2A] Ignoring user-provided AgentCard.protocol_version; "
"runtime controls this field.",
)
if (
self._get_agent_card_field(
"supports_authenticated_extended_card",
)
is not None
):
logger.warning(
"[A2A] Ignoring user-provided "
"AgentCard.supports_authenticated_extended_card; "
"runtime controls this field.",
)
if self._get_agent_card_field("signatures") is not None:
logger.warning(
"[A2A] Ignoring user-provided AgentCard.signatures; "
"runtime controls this field.",
)
# Add optional fields
for field in [
"provider",
"documentation_url",
"icon_url",
"security_schemes",
"security",
]:
value = self._get_agent_card_field(field)
if value is None:
continue
# Backward compatibility: allow simple string provider and map it
# to AgentProvider.organization
if field == "provider" and isinstance(value, str):
card_kwargs[field] = {
"organization": value,
"url": url,
}
else:
card_kwargs[field] = value
return AgentCard(**card_kwargs)