Source code for agentscope_runtime.engine.deployers.adapter.responses.response_api_protocol_adapter

# -*- coding: utf-8 -*-
import asyncio
import json
import logging
import traceback
from typing import Callable, Dict, Any, Optional, AsyncGenerator
from uuid import uuid4

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse

# OpenAI Response API official type definition
from openai.types.responses import ResponseCreateParams

from .response_api_adapter_utils import ResponsesAdapter
from .response_api_agent_adapter import ResponseAPIExecutor
from ..protocol_adapter import ProtocolAdapter
from ....schemas.agent_schemas import AgentRequest, BaseResponse

logger = logging.getLogger(__name__)

RESPONSES_ENDPOINT_PATH = "/compatible-mode/v1/responses"
SSE_HEADERS = {
    "Cache-Control": "no-cache",
    "Connection": "keep-alive",
    "X-Accel-Buffering": "no",  # disable Nginx buffering
    "Access-Control-Allow-Origin": "*",
    "Access-Control-Allow-Methods": "POST, OPTIONS",
    "Access-Control-Allow-Headers": "Content-Type, Authorization",
}


[docs] class ResponseAPIDefaultAdapter(ProtocolAdapter):
[docs] def __init__(self, **kwargs) -> None: super().__init__(**kwargs) self._executor: Optional[ResponseAPIExecutor] = None self._timeout = kwargs.get("timeout", 300) # seconds self._max_concurrent_requests = kwargs.get( "max_concurrent_requests", 100, ) self._semaphore = asyncio.Semaphore(self._max_concurrent_requests)
async def _handle_requests(self, request: Request) -> StreamingResponse: """ Handle OpenAI Response API request. Args: request: FastAPI Request object Returns: StreamingResponse: SSE streaming response """ # Concurrency guard per request await self._semaphore.acquire() request_id = f"resp_{uuid4()}" logger.info("[ResponseAPI] start request_id=%s", request_id) try: # Parse request body request_data = await request.json() stream = request_data.get("stream", False) # Stream or non-stream if stream: # Return SSE streaming response with timeout control return StreamingResponse( self._generate_stream_response_with_timeout( request=request_data, request_id=request_id, ), media_type="text/event-stream", headers=SSE_HEADERS, ) # Non-stream JSON response response_obj = await self._collect_non_stream_response( request=request_data, request_id=request_id, ) return JSONResponse(content=response_obj) except HTTPException: raise except Exception as e: logger.error( f"Unexpected error in _handle_requests: {e}\n" f"{traceback.format_exc()}", ) raise HTTPException( status_code=500, detail="Internal server error", ) from e finally: self._semaphore.release() logger.info("[ResponseAPI] end request_id=%s", request_id) def _convert_response_request_to_agent_request( self, response_request: ResponseCreateParams, ) -> AgentRequest: """ Convert an OpenAI Response API request to Agent API request. Args: response_request: OpenAI Response API request Returns: AgentRequest: Agent API request """ # Convert with ResponsesAdapter adapter = ResponsesAdapter() agent_request = adapter.convert_responses_request_to_agent_request( response_request.model_dump(), ) return agent_request async def _collect_non_stream_response( self, request: Dict, request_id: str, ) -> Dict[str, Any]: """ Run the agent and build a non-streaming OpenAI Responses API object. This collects events until completion and returns a single JSON object representing the final response. """ last_agent_response: Optional[BaseResponse] = None try: async for event in self._executor.execute(request): last_agent_response = event except Exception: # Map to Responses error shape return { "id": request_id, "object": "response", "status": "failed", "error": { "code": "non_stream_error", "message": "Failed to build non-stream response", }, } if not last_agent_response: # No response produced return { "id": request_id, "object": "response", "status": "failed", "error": { "code": "empty_response", "message": "No response produced by agent", }, } # Convert to dict for JSONResponse return last_agent_response.model_dump(exclude_none=True) async def _generate_stream_response_with_timeout( self, request: Dict, request_id: str, ) -> AsyncGenerator[str, None]: """ Generate SSE streaming response with timeout. Args: agent_request: Agent API request Yields: str: SSE data chunks """ try: # Add timeout with asyncio.wait_for async for chunk in self._generate_stream_response( request=request, request_id=request_id, ): yield chunk except asyncio.TimeoutError: logger.error(f"Request timeout after {self._timeout} seconds") # Send timeout error event timeout_event = { "id": request_id, "object": "response", "status": "failed", "error": { "code": "timeout", "message": f"Request timed out after " f"{self._timeout} seconds", }, } yield ( f"event: response.failed\n" f"data: {json.dumps(timeout_event)}\n\n" ) except Exception as e: logger.error( f"Error in timeout-controlled stream: {e}\n" f"{traceback.format_exc()}", ) # Send error event error_event = { "id": request_id, "object": "response", "status": "failed", "error": { "code": "stream_error", "message": str(e), }, } yield ( f"event: response.failed\n" f"data: {json.dumps(error_event)}\n\n" ) async def _generate_stream_response( self, request: Dict, request_id: str, ) -> AsyncGenerator[str, None]: """ Generate SSE streaming response. Args: request: Responses API request Yields: str: SSE data chunks """ try: # Handle streaming events async for event in self._executor.execute(request): try: if event: # Serialize event event_data = event.model_dump(exclude_none=True) data = json.dumps(event_data, ensure_ascii=False) # Set SSE event type based on event type event_type = event_data.get("type", "message") yield f"event: {event_type}\ndata: {data}\n\n" except Exception as e: logger.error( f"Error processing event: {e}\n" f"{traceback.format_exc()}", ) # Send error event error_event = { "id": request_id, "object": "response", "status": "failed", "error": { "code": "processing_error", "message": str(e), }, } yield ( f"event: response.failed\n" f"data: {json.dumps(error_event)}\n\n" ) break except Exception as e: logger.error( f"Error in stream generation: {e}\n{traceback.format_exc()}", ) # Send error event error_event = { "id": request_id, "object": "response", "status": "failed", "error": { "code": "stream_error", "message": str(e), }, } yield ( f"event: response.failed\n" f"data: {json.dumps(error_event)}\n\n" )
[docs] def add_endpoint(self, app: FastAPI, func: Callable, **kwargs) -> None: """ Add endpoint to FastAPI app. Args: app: FastAPI application instance func: handler function **kwargs: extra options """ # Create executor self._executor = ResponseAPIExecutor(func=func) # Register route app.post( RESPONSES_ENDPOINT_PATH, openapi_extra={ "requestBody": { "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ResponseAPI", }, }, }, "required": True, "description": "OpenAI Response API compatible " "request format", }, }, )(self._handle_requests)