Source code for agentscope_runtime.engine.deployers.adapter.a2a.a2a_agent_adapter
# -*- coding: utf-8 -*-
# pylint: disable=unused-argument
import logging
import traceback
from typing import Callable
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.types import UnsupportedOperationError
from a2a.utils.errors import ServerError
from agentscope_runtime.engine.deployers.adapter.a2a.a2a_adapter_utils import (
agent_message_to_a2a_message,
)
from agentscope_runtime.engine.schemas.agent_schemas import (
AgentRequest,
RunStatus,
)
logger = logging.getLogger(__name__)
[docs]
class A2AExecutor(AgentExecutor):
[docs]
def __init__(self, func: Callable, **kwargs):
self._func = func
[docs]
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
query = context.get_user_input()
request = AgentRequest.model_validate(
{
"session_id": context.context_id,
"response_id": context.task_id,
"input": [
{
"role": "user",
"content": [
{
"type": "text",
"text": query,
},
],
},
],
"stream": True,
},
)
try:
async for event in self._func(request=request):
if event.object == "response":
if event.status == RunStatus.Completed:
if event.output:
message = event.output[len(event.output) - 1]
a2a_message = agent_message_to_a2a_message(message)
await event_queue.enqueue_event(a2a_message)
except Exception as e:
logger.error(f"An error occurred: {e}, {traceback.format_exc()}")
[docs]
async def cancel(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
raise ServerError(error=UnsupportedOperationError())