Source code for agentscope_runtime.engine.deployers.adapter.responses.response_api_adapter_utils

# -*- coding: utf-8 -*-
# pylint: disable=too-many-branches,too-many-return-statements,line-too-long

"""
Responses Adapter

Bidirectional protocol converter: Responses API ↔ Agent API

Conversion functions:
1. Responses API request → Agent API request
2. Agent API event → Responses API event
3. Support for streaming and non-streaming conversion
"""

import time
import uuid
from typing import Any, Dict, List, Optional, Union

# OpenAI Responses API Types
from openai.types.responses import (
    Response,
    ResponseCompletedEvent,
    ResponseContentPartAddedEvent,
    ResponseContentPartDoneEvent,
    ResponseCreatedEvent,
    ResponseErrorEvent,
    ResponseFailedEvent,
    ResponseFunctionCallArgumentsDeltaEvent,
    ResponseFunctionCallArgumentsDoneEvent,
    ResponseInProgressEvent,
    ResponseOutputItemAddedEvent,
    ResponseOutputItemDoneEvent,
    ResponseReasoningTextDeltaEvent,
    ResponseReasoningTextDoneEvent,
    ResponseRefusalDeltaEvent,
    ResponseRefusalDoneEvent,
    ResponseStatus,
    ResponseStreamEvent,
    ResponseTextDeltaEvent,
    ResponseTextDoneEvent,
)
from openai.types.responses.response_function_tool_call import (
    ResponseFunctionToolCall,
)
from openai.types.responses.response_mcp_call_completed_event import (
    ResponseMcpCallCompletedEvent,
)
from openai.types.responses.response_mcp_call_in_progress_event import (
    ResponseMcpCallInProgressEvent,
)
from openai.types.responses.response_mcp_list_tools_completed_event import (
    ResponseMcpListToolsCompletedEvent,
)
from openai.types.responses.response_mcp_list_tools_in_progress_event import (
    ResponseMcpListToolsInProgressEvent,
)
from openai.types.responses.response_output_item import (
    McpCall,
    McpListTools,
    McpListToolsTool,
    ResponseOutputItem,
)
from openai.types.responses.response_output_message import (
    ResponseOutputMessage,
)
from openai.types.responses.response_output_refusal import (
    ResponseOutputRefusal,
)
from openai.types.responses.response_output_text import ResponseOutputText
from openai.types.responses.response_reasoning_item import (
    Content as ReasoningContent,
)
from openai.types.responses.response_reasoning_item import (
    ResponseReasoningItem,
)

from agentscope_runtime.engine.schemas.agent_schemas import (
    AgentRequest,
    BaseResponse,
    Content,
    ContentType,
    DataContent,
    Event,
    Message,
    MessageType,
    RefusalContent,
    Role,
    RunStatus,
    TextContent,
    ToolCall,
    ToolCallOutput,
    FunctionTool,
    Tool,
    ImageContent,
    AudioContent,
    FileContent,
)


# Agent API Types


[docs] class ResponsesAdapter: """ Bidirectional protocol converter: Responses API ↔ Agent API Main functions: 1. Convert Responses API request → Agent API request 2. Convert Agent API event → Responses API event 3. Convert Responses API event stream → Agent API event stream 4. Handle various message types (text, tool calls, reasoning, etc.) """
[docs] def __init__(self) -> None: self.sequence_counter = 0 # Temporary storage structure: key is message id, value is dict # containing message_type and content_index_list self._message_content_index_map: Dict = {} # Additional adaptation work for adapting Agent API RAG plugin calls # to Responses API FileSearch calls self._file_search_call_map: Optional[Dict] = None self._output_index: int = 0 self._output: List[ResponseOutputItem] = []
[docs] def convert_agent_response_to_responses( self, agent_response: BaseResponse, ): # First convert Response response = self._convert_agent_response_responses_api( agent_response=agent_response, ) # Convert Message messages = self._convert_agent_message_to_responses( agent_message_list=agent_response.output, ) response.output = messages # Convert Content return response
[docs] def convert_status_to_responses(self, agent_status: str) -> ResponseStatus: if agent_status in (RunStatus.Created, RunStatus.Queued): return "queued" elif agent_status == RunStatus.InProgress: return "in_progress" elif agent_status == RunStatus.Completed: return "completed" elif agent_status == RunStatus.Failed: return "failed" elif agent_status == RunStatus.Cancelled: return "cancelled" elif agent_status == RunStatus.Incomplete: return "incomplete" else: return "in_progress"
def _convert_agent_message_to_responses( self, agent_message_list: List[Message], ): messages = [] if agent_message_list: for message in agent_message_list: if message.type == MessageType.MESSAGE: output_message = ( self._convert_message_type_to_output_message( message, ) ) messages.append(output_message) if message.type == MessageType.FUNCTION_CALL: function_call_message = ( self._convert_function_call_to_output_message( message, ) ) messages.append(function_call_message) if message.type == MessageType.MCP_LIST_TOOLS: mcp_list_tools_message = ( self._convert_mcp_list_tools_to_output_message( message, ) ) messages.append(mcp_list_tools_message) if message.type == MessageType.MCP_TOOL_CALL: tool_call_message = ( self._convert_mcp_tool_call_to_output_message( message, ) ) messages.append(tool_call_message) if message.type == MessageType.REASONING: reasoning_message = ( self._convert_reasoning_to_output_message( message, ) ) messages.append(reasoning_message) return messages def _convert_agent_response_responses_api( self, agent_response: BaseResponse, ): status = agent_response.status response_status = self.convert_status_to_responses(status) # Extract real data from agent_event response_id = ( getattr( agent_response, "id", f"resp_{uuid.uuid4().hex[:8]}", ) or f"resp_{uuid.uuid4().hex[:8]}" ) created_at = ( getattr( agent_response, "created_at", time.time(), ) or time.time() ) # Modified: ensure model value returns default empty string when None model = getattr(agent_response, "model", "") or "" parallel_tool_calls = ( getattr( agent_response, "parallel_tool_calls", False, ) or False ) tool_choice = getattr(agent_response, "tool_choice", "auto") or "auto" tools = getattr(agent_response, "tools", []) or [] error = getattr(agent_response, "error", None) # Convert Agent API error to Responses API error responses_error = None if error: responses_error = self._convert_agent_error_to_responses_error( error, ) # Create real Response object using data from agent_event response = Response( id=response_id, status=response_status, created_at=created_at, model=model, object="response", output=[], parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, tools=tools, error=responses_error, # Set converted error ) return response # ===== Request conversion: Responses API → Agent API =====
[docs] def convert_responses_request_to_agent_request( self, responses_request: Dict[str, Any], ) -> AgentRequest: """ Convert Responses API request to Agent API request Implement automatic assignment of fields with the same name, then explicitly handle different field names Args: responses_request: OpenAI ResponseCreateParams Returns: AgentRequest: Agent API request format """ # 1. Extract input messages input_messages = self._extract_input_messages(responses_request) # 2. Automatic assignment of fields with the same name common_fields = self._extract_common_fields( responses_request=responses_request, request_type="agent", ) # 3. Explicit mapping of different field names special_mappings = self._extract_special_mappings(responses_request) # 4. Merge all fields to create AgentRequest agent_request_data = { "input": input_messages, **common_fields, **special_mappings, } return AgentRequest(**agent_request_data)
def _extract_input_messages( self, responses_request: Dict[str, Any], ) -> List[Message]: """Extract and convert input messages""" input_messages = [] # Extract input from responses_request if "input" in responses_request and responses_request["input"]: input_data = responses_request["input"] # Handle Text input (string) type if isinstance(input_data, str): message = self._convert_text_input_to_agent_message(input_data) input_messages.append(message) # Handle Input item list (array) type elif isinstance(input_data, list): for input_item in input_data: # Filter out developer role (not supported yet) if "developer" == input_item.get("role", "user"): continue # Handle dictionary format input if isinstance(input_item, dict): item_type = input_item.get("type") # If there's no type field but has role and content, # consider it as message type if ( not item_type and "role" in input_item and "content" in input_item ): item_type = "message" else: item_type = getattr(input_item, "type", None) if item_type == "message": # Convert to Agent API Message message = self._convert_responses_input_message_to_agent_message( # noqa: E501 input_item, ) input_messages.append(message) elif item_type == "reasoning": # Convert to Agent API Message (type=REASONING) message = self._convert_reasoning_to_message( input_item, ) input_messages.append(message) elif item_type == "custom_tool_call": # Convert to Agent API Message (type=PLUGIN_CALL) message = self._convert_custom_tool_call_to_message( input_item, ) input_messages.append(message) elif item_type == "custom_tool_call_output": # Convert to Agent API Message # (type=PLUGIN_CALL_OUTPUT) message = ( self._convert_custom_tool_call_output_to_message( input_item, ) ) input_messages.append(message) elif item_type == "function_call": # Convert to Agent API Message (type=FUNCTION_CALL) message = self._convert_function_call_to_message( input_item, ) input_messages.append(message) elif item_type == "function_call_output": # Convert to Agent API Message # (type=FUNCTION_CALL_OUTPUT) message = ( self._convert_function_call_output_to_message( input_item, ) ) input_messages.append(message) return input_messages def _convert_text_input_to_agent_message(self, text_input: str) -> Message: """Convert Text input (string) to Agent API Message""" # Create text content text_content = TextContent(type="text", text=text_input, delta=False) # Create message message = Message( role=Role.USER, type="message", content=[text_content], ) return message def _extract_common_fields( self, responses_request: Dict[str, Any], request_type: Optional[str] = "agent", ) -> Dict[str, Any]: """ Intelligently extract fields with the same name, automatically detect and map fields with the same name and type Automatically discover fields with the same name and type in ResponseCreateParams and AgentRequest through reflection mechanism """ common_fields = {} # Get AgentRequest field information request_fields = None if request_type == "workflow": request_fields = self._get_workflow_request_field_info() else: request_fields = self._get_agent_request_field_info() # Iterate through all keys in ResponseCreateParams for attr_name in responses_request.keys(): # Skip private attributes and methods if attr_name.startswith("_"): continue # Get value try: value = responses_request[attr_name] except KeyError: continue # Skip None values if value is None: continue # Check if AgentRequest has field with same name if attr_name not in request_fields: continue # Skip fields that need special handling if attr_name == "input": # input field needs special conversion, not handled here continue if attr_name == "tools": # tools field needs special conversion, convert Responses API # format to Agent API format converted_tools = self._convert_responses_tools_to_agent_tools( value, ) if converted_tools is not None: common_fields[attr_name] = converted_tools continue # Check if types are compatible agent_field_type = request_fields[attr_name] if self._is_type_compatible(value, agent_field_type): common_fields[attr_name] = value return common_fields def _convert_responses_tools_to_agent_tools( self, responses_tools: List[ Dict[ str, Any, ] ], ) -> Optional[List[Any]]: """ Convert Responses API tools format to Agent API tools format Responses API format: [{ "name": "get_weather", "description": "Get the current weather in a given location", "strict": true, "type": "function", "parameters": { "type": "object", "properties": {...}, "required": [...] } }] Agent API format: [{ "type": "function", "function": { "name": "get_weather", "description": "Get the current weather in a given location", "parameters": { "type": "object", "properties": {...}, "required": [...] } } }] """ if not responses_tools or not isinstance(responses_tools, list): return None converted_tools = [] for tool_data in responses_tools: if not isinstance(tool_data, dict): continue # Extract basic information name = tool_data.get("name", "") description = tool_data.get("description", "") tool_type = tool_data.get("type", "function") parameters = tool_data.get("parameters", {}) # Skip invalid tools if not name: continue # Create FunctionTool function_tool = FunctionTool( name=name, description=description, parameters=parameters, ) # Create Agent API Tool agent_tool = Tool(type=tool_type, function=function_tool) converted_tools.append(agent_tool) return converted_tools if converted_tools else None def _get_agent_request_field_info(self) -> Dict[str, type]: """Get AgentRequest field type information""" # Cache field information to avoid repeated calculations if not hasattr(self, "_agent_request_fields_cache"): from typing import get_type_hints # Get AgentRequest type annotations type_hints = get_type_hints(AgentRequest) self._agent_request_fields_cache = type_hints return self._agent_request_fields_cache def _is_type_compatible(self, value: Any, target_type: type) -> bool: """Check if value type is compatible with target type""" if target_type is None: return True # Handle Union types (e.g. Optional[str] = Union[str, None]) if ( hasattr( target_type, "__origin__", ) and target_type.__origin__ is Union ): # Check if compatible with any type in Union for union_type in target_type.__args__: if union_type is type(None): # Skip None type continue if self._is_type_compatible(value, union_type): return True return False # Handle List types if ( hasattr( target_type, "__origin__", ) and target_type.__origin__ is list ): if isinstance(value, list): return True return False # Handle basic types try: # Direct type check if isinstance(value, target_type): return True # Special type conversion check - more strict check if target_type == str and isinstance(value, (str, int, float)): return True if target_type == int and isinstance(value, int): return True # Only allow integers if target_type == float and isinstance(value, (float, int)): return True # Allow integer to float conversion if target_type == bool and isinstance(value, bool): return True # Only allow boolean values except Exception: pass return False def _extract_special_mappings( self, responses_request: Dict[str, Any], ) -> Dict[str, Any]: """ Extract different field names, explicit mapping Different field name mappings: - max_output_tokens -> max_tokens - conversation -> session_id - previous_response_id -> previous_response_id """ special_mappings = {} # conversation -> session_id if "conversation" in responses_request: conversation = responses_request["conversation"] if conversation is not None: # If conversation is an object, extract ID if hasattr(conversation, "id"): special_mappings["session_id"] = conversation.id elif isinstance(conversation, dict) and "id" in conversation: special_mappings["session_id"] = conversation["id"] else: # If conversation is itself an ID string special_mappings["session_id"] = str(conversation) return special_mappings def _convert_responses_input_message_to_agent_message( self, input_message, ) -> Message: """Convert Responses API Input message to Agent API Message""" # Extract message attributes if isinstance(input_message, dict): content = input_message.get("content", []) role = input_message.get("role", "user") msg_type = input_message.get("type", "message") else: content = getattr(input_message, "content", []) role = getattr(input_message, "role", "user") msg_type = getattr(input_message, "type", "message") # Convert content items content_list = [] # If content is string, directly convert to TextContent if isinstance(content, str): text_content = TextContent( type=ContentType.TEXT, text=content, delta=False, ) content_list.append(text_content) # If content is list, iterate and convert each item elif isinstance(content, list): for content_item in content: agent_content = self._convert_content_item_to_agent_content( content_item, ) if agent_content: content_list.append(agent_content) # Create Agent API Message message = Message(role=role, type=msg_type, content=content_list) return message def _convert_reasoning_to_message(self, input_reasoning) -> Message: """Convert Responses API Input reasoning to Agent API Message""" # Extract reasoning attributes if isinstance(input_reasoning, dict): content = input_reasoning.get("content", []) else: content = getattr(input_reasoning, "content", []) # Convert content items to text content content_list = [] # Process content for content_item in content: if isinstance(content_item, dict): content_text = content_item.get("text", "") else: content_text = getattr(content_item, "text", "") if content_text: text_content = TextContent( type=ContentType.TEXT, text=content_text, ) content_list.append(text_content) # Create Agent API Message (type=REASONING) message = Message( role="assistant", # reasoning is usually assistant's reasoning process type=MessageType.REASONING, content=content_list, ) return message def _convert_content_item_to_agent_content(self, content_item): """Convert content item to Agent API Content""" # Handle dictionary or object format content items if isinstance(content_item, dict): content_type = content_item.get("type") content_text = content_item.get("text") content_refusal = content_item.get("refusal") image_url = content_item.get("image_url") # Audio data is in input_audio object input_audio = content_item.get("input_audio", {}) audio_data = ( input_audio.get("data") if isinstance( input_audio, dict, ) else None ) audio_format = ( input_audio.get("format") if isinstance( input_audio, dict, ) else None ) # File data is directly at root level file_data = content_item.get("file_data") file_id = content_item.get("file_id") file_url = content_item.get("file_url") filename = content_item.get("filename") else: content_type = getattr(content_item, "type", None) content_text = getattr(content_item, "text", None) content_refusal = getattr(content_item, "refusal", None) image_url = getattr(content_item, "image_url", None) # Audio data is in input_audio object input_audio = getattr(content_item, "input_audio", None) audio_data = ( getattr( input_audio, "data", None, ) if input_audio else None ) audio_format = ( getattr( input_audio, "format", None, ) if input_audio else None ) # File data is directly at root level file_data = getattr(content_item, "file_data", None) file_id = getattr(content_item, "file_id", None) file_url = getattr(content_item, "file_url", None) filename = getattr(content_item, "filename", None) # Convert different types of input content if content_type == "input_text" and content_text: return TextContent(type=ContentType.TEXT, text=content_text) elif content_type == "output_text" and content_text: return TextContent(type=ContentType.TEXT, text=content_text) elif content_type == "refusal" and content_refusal: return RefusalContent( type=ContentType.REFUSAL, refusal=content_refusal, ) elif content_type == "input_image" and image_url: return ImageContent(type=ContentType.IMAGE, image_url=image_url) elif content_type == "input_audio" and audio_data: return AudioContent( type=ContentType.AUDIO, data=audio_data, format=audio_format, ) elif content_type == "input_file" and ( file_url or file_id or file_data ): return FileContent( type=ContentType.FILE, file_url=file_url, file_id=file_id, filename=filename, ) return None # ===== Response conversion: Agent API → Responses API =====
[docs] def convert_agent_event_to_responses_event( self, agent_event: Event, ) -> Optional[List[ResponseStreamEvent]]: """ Convert Agent API event to Responses API stream event Args: agent_event: Agent API Event Returns: ResponseStreamEvent or None """ # 1. If it's a response message type, convert to response stream # message in responses api if isinstance(agent_event, BaseResponse): return self._convert_response_to_responses_event(agent_event) # 2. If it's a message message type, convert to corresponding # message type elif isinstance(agent_event, Message): return self._convert_message_to_responses_event(agent_event) # 3. If it's a content message, perform corresponding # content conversion elif isinstance(agent_event, Content): return self._convert_content_to_responses_event(agent_event) # Other types return None for now return None
def _convert_response_to_responses_event( self, response_event: BaseResponse, ) -> Optional[List[ResponseStreamEvent]]: """ Convert response message type to Responses API stream event Args: response_event: Agent API BaseResponse Returns: ResponseStreamEvent or None """ status = response_event.status responses = [] response = self._convert_agent_response_responses_api(response_event) response.output = self._output # Create corresponding events based on status if status == "created": created = ResponseCreatedEvent( type="response.created", response=response, sequence_number=0, ) # Will be set uniformly in responses_service responses.append(created) elif status == "in_progress": in_progress = ResponseInProgressEvent( type="response.in_progress", response=response, sequence_number=0, ) # Will be set uniformly in responses_service responses.append(in_progress) elif status == "completed": completed = ResponseCompletedEvent( type="response.completed", response=response, sequence_number=0, ) # Will be set uniformly in responses_service responses.append(completed) elif status == "failed": failed = ResponseFailedEvent( type="response.failed", response=response, sequence_number=0, ) # Will be set uniformly in responses_service responses.append(failed) return responses def _convert_message_to_responses_event( self, message_event: Message, ) -> Optional[List[ResponseStreamEvent]]: """ Convert message message type to Responses API stream event Args: message_event: Agent API Message Returns: ResponseStreamEvent or None """ message_id = message_event.id # Check if message id already exists in temporary storage structure if message_id not in self._message_content_index_map: # If not, record it in the structure self._message_content_index_map[message_id] = { "message_type": message_event.type, "content_index_list": [], } # If message_id doesn't exist, handle new message return self._handle_new_message(message_event) else: # If message_id already exists, handle differently based on # message status return self._handle_existing_message(message_event) def _get_add_output_index(self, message_id: str): output_index = self._output_index if ( self._message_content_index_map and self._message_content_index_map[message_id] ): self._message_content_index_map[message_id][ "output_index" ] = output_index self._output_index += 1 return output_index def _get_output_index(self, message_id: str): if ( self._message_content_index_map and self._message_content_index_map[message_id] ): return self._message_content_index_map[message_id]["output_index"] return self._output_index def _handle_new_message( self, message_event: Message, ) -> Optional[List[ResponseStreamEvent]]: messages = [] # Handle different message types if message_event.type == MessageType.FUNCTION_CALL: self._get_add_output_index(message_event.id) elif message_event.type == MessageType.REASONING: # reasoning directly returns ResponseReasoningItem reasoning_item = self._convert_reasoning_to_output_message( message_event, ) item_added_event = ResponseOutputItemAddedEvent( type="response.output_item.added", item=reasoning_item, output_index=self._output_index, sequence_number=0, ) # Will be set uniformly in responses_service # sequence_number will be set uniformly in responses_service self._get_add_output_index(message_event.id) messages.append(item_added_event) elif message_event.type == MessageType.MCP_LIST_TOOLS: # Convert MCP tool list to ResponseOutputMessage output_message = self._convert_mcp_list_tools_to_output_message( message_event, ) output_item_added_event = ResponseOutputItemAddedEvent( type="response.output_item.added", item=output_message, output_index=self._output_index, sequence_number=0, ) # Will be set uniformly in responses_service # sequence_number will be set uniformly in responses_service self._get_add_output_index(message_event.id) messages.append(output_item_added_event) elif message_event.type == MessageType.MCP_TOOL_CALL: # Convert MCP tool call to ResponseFunctionToolCall function_tool_call = self._convert_mcp_tool_call_to_output_message( message_event, ) output_item_added_event = ResponseOutputItemAddedEvent( type="response.output_item.added", item=function_tool_call, output_index=self._output_index, sequence_number=0, ) # Will be set uniformly in responses_service # sequence_number will be set uniformly in responses_service self._get_add_output_index(message_event.id) messages.append(output_item_added_event) elif message_event.type == MessageType.MESSAGE: # Convert other types to ResponseOutputMessage add_output_message = self._convert_message_type_to_output_message( message_event, ) add_output_message.content = [] add_output_message.status = RunStatus.InProgress output_item_added_event = ResponseOutputItemAddedEvent( type="response.output_item.added", item=add_output_message, output_index=self._output_index, sequence_number=0, ) # Will be set uniformly in responses_service # sequence_number will be set uniformly in responses_service self._get_add_output_index(message_event.id) messages.append(output_item_added_event) if message_event.status == "completed": output_message = self._convert_message_type_to_output_message( message_event, ) if not output_message: return messages # Generate response.output_item.done # corresponding responses api object # sequence_number will be set uniformly in responses_service event = ResponseOutputItemDoneEvent( type="response.output_item.done", item=output_message, output_index=self._output_index, sequence_number=0, ) # Will be set uniformly in responses_service self._output.append(output_message) messages.append(event) return messages def _handle_existing_message( self, message_event: Message, ) -> Optional[List[ResponseStreamEvent]]: """ # Handle existing message, generate corresponding events # based on message type and status Args: message_event: Agent API Message Returns: ResponseStreamEvent or None """ # Dispatch to corresponding handler functions based on message type if message_event.type == MessageType.MESSAGE: return self._handle_message_status_change(message_event) elif message_event.type == MessageType.FUNCTION_CALL: return self._handle_function_call_status_change(message_event) elif message_event.type == MessageType.MCP_LIST_TOOLS: return self._handle_mcp_list_tools_status_change(message_event) elif message_event.type == MessageType.MCP_TOOL_CALL: return self._handle_mcp_tool_call_status_change(message_event) elif message_event.type == MessageType.REASONING: return self._handle_reasoning_status_change(message_event) elif message_event.type == MessageType.ERROR: return self._handle_error_status_change(message_event) return None def _handle_message_status_change( self, message_event: Message, ) -> Optional[List[ResponseStreamEvent]]: """ Handle MESSAGE type message status changes Args: message_event: Agent API Message Returns: ResponseStreamEvent list or None """ status = getattr(message_event, "status", "completed") messages = [] if status == "completed": output_message = self._convert_message_type_to_output_message( message_event, ) if not output_message: return messages output_index = self._get_output_index(message_id=message_event.id) # Generate response.output_item.done # corresponding responses api object # sequence_number will be set uniformly in responses_service event = ResponseOutputItemDoneEvent( type="response.output_item.done", item=output_message, output_index=output_index, sequence_number=0, ) # Will be set uniformly in responses_service self._output.append(output_message) messages.append(event) return messages def _handle_function_call_status_change( self, message_event: Message, ) -> Optional[List[ResponseStreamEvent]]: """ Handle FUNCTION_CALL type message status changes Args: message_event: Agent API Message Returns: ResponseStreamEvent list or None """ status = getattr(message_event, "status", "completed") if status == "completed": messages = [] output_message = None output_message = self._convert_function_call_to_output_message( message_event, ) if not output_message: return messages output_index = self._get_output_index(message_id=message_event.id) # Generate response.output_item.done # corresponding responses api object # sequence_number will be set uniformly in responses_service event = ResponseOutputItemDoneEvent( type="response.output_item.done", item=output_message, output_index=output_index, sequence_number=0, ) # Will be set uniformly in responses_service messages.append(event) self._output.append(output_message) return messages return None def _handle_mcp_list_tools_status_change( self, message_event: Message, ) -> Optional[List[ResponseStreamEvent]]: """ Handle MCP_LIST_TOOLS type message status changes Args: message_event: Agent API Message Returns: ResponseStreamEvent list or None """ status = getattr(message_event, "status", "completed") events = [] # Get output_index output_index = self._get_output_index(message_event.id) if status == "in_progress": event = self._create_mcp_list_tools_in_progress_event( message_event, output_index, ) if event: events.append(event) elif status == "completed": completed_events = self._create_mcp_list_tools_completed_event( message_event, output_index, ) if completed_events: events.extend(completed_events) elif status == "failed": error_message = "MCP list tools operation failed" error_event = self._create_error_event( error_message=error_message, ) if error_event: events.append(error_event) return events if events else None def _handle_mcp_tool_call_status_change( self, message_event: Message, ) -> Optional[List[ResponseStreamEvent]]: """ Handle MCP_TOOL_CALL type message status changes Args: message_event: Agent API Message Returns: ResponseStreamEvent list or None """ status = getattr(message_event, "status", "completed") events = [] # Get output_index output_index = self._get_output_index(message_event.id) if status == "in_progress": event = self._create_mcp_tool_call_in_progress_event( message_event, output_index, ) if event: events.append(event) elif status == "completed": completed_events = self._create_mcp_tool_call_completed_event( message_event, output_index, ) if completed_events: events.extend(completed_events) elif status == "failed": error_message = "MCP tool call operation failed" error_event = self._create_error_event( error_message=error_message, ) if error_event: events.append(error_event) return events if events else None def _handle_reasoning_status_change( self, message_event: Message, ) -> Optional[List[ResponseStreamEvent]]: """ Handle REASONING type message status changes Args: message_event: Agent API Message Returns: ResponseStreamEvent list or None """ status = getattr(message_event, "status", "completed") if status == "completed": messages = [] output_message = self._convert_reasoning_to_output_message( message_event, ) if not output_message: return messages # Generate response.output_item.done # corresponding responses api object # sequence_number will be set uniformly in responses_service event = ResponseOutputItemDoneEvent( type="response.output_item.done", item=output_message, output_index=self._output_index, sequence_number=0, ) # Will be set uniformly in responses_service messages.append(event) self._output.append(output_message) return messages return None def _handle_error_status_change( self, message_event: Message, ) -> Optional[List[ResponseStreamEvent]]: """ Handle ERROR type message status changes Args: message_event: Agent API Message Returns: ResponseStreamEvent list or None """ status = getattr(message_event, "status", "completed") if status == "completed": messages = [] output_message = self._convert_error_to_output_message( message_event, ) if not output_message: return messages # Generate response.output_item.done # corresponding responses api object # sequence_number will be set uniformly in responses_service event = ResponseOutputItemDoneEvent( type="response.output_item.done", item=output_message, output_index=self._output_index, sequence_number=0, ) # Will be set uniformly in responses_service messages.append(event) return messages return None def _convert_message_type_to_output_message( self, message: Message, ) -> ResponseOutputMessage: """ Convert normal message type to ResponseOutputMessage Args: message: Agent API Message (type='message') Returns: ResponseOutputMessage: Responses API output message """ # Convert content output_content = [] if message.content: for content_item in message.content: if content_item.type == ContentType.TEXT: output_text = ResponseOutputText( type="output_text", text=content_item.text, annotations=[], ) output_content.append(output_text) elif content_item.type == ContentType.REFUSAL: # Handle REFUSAL type refusal_text = getattr(content_item, "refusal", "") output_refusal = ResponseOutputRefusal( type="refusal", refusal=refusal_text, ) output_content.append(output_refusal) return self._create_base_output_message(message, output_content) def _convert_function_call_to_output_message( self, message: Message, ) -> ResponseFunctionToolCall: """ Convert function_call type to ResponseFunctionToolCall Args: message: Agent API Message (type='function_call') Returns: ResponseFunctionToolCall: Responses API function tool call """ # Convert function call data function_call_data = {} if message.content: for content_item in message.content: if content_item.type == ContentType.DATA: function_call_data = content_item.data break if not isinstance(function_call_data, dict): function_call_data = {} # Create ResponseFunctionToolCall return ResponseFunctionToolCall( id=message.id, type="function_call", name=function_call_data.get("name", ""), arguments=function_call_data.get("arguments", ""), call_id=function_call_data.get("call_id", ""), status=message.status, ) def _convert_reasoning_to_output_message( self, message: Message, ) -> ResponseReasoningItem: """ Convert reasoning type to ResponseReasoningItem Args: message: Agent API Message (type='reasoning') Returns: ResponseReasoningItem: Responses API reasoning item """ # Extract reasoning text content from message content reasoning_text = "" if message.content: for content_item in message.content: if content_item.type == ContentType.TEXT: reasoning_text = content_item.text break # Create ResponseReasoningItem return ResponseReasoningItem( type="reasoning", id=message.id, summary=[], # Empty summary content=( [ReasoningContent(type="reasoning_text", text=reasoning_text)] if reasoning_text else None ), encrypted_content=None, status=None, ) def _convert_error_to_output_message( self, message: Message, ) -> ResponseOutputMessage: """ Convert error type to ResponseOutputMessage Args: message: Agent API Message (type='error') Returns: ResponseOutputMessage: Responses API output message """ # Convert error data to text content output_content = [] if message.content: for content_item in message.content: if content_item.type == ContentType.TEXT: # Convert error text content to ResponseOutputText error_text = content_item.text if error_text: output_text_obj = ResponseOutputText( type="output_text", text=error_text, annotations=[], ) output_content.append(output_text_obj) elif content_item.type == ContentType.DATA: # Handle error data content error_data = content_item.data if isinstance(error_data, dict): error_message = error_data.get( "message", str(error_data), ) if error_message: output_text_obj = ResponseOutputText( type="output_text", text=error_message, annotations=[], ) output_content.append(output_text_obj) return self._create_base_output_message(message, output_content) def _create_base_output_message( self, message: Message, content: List, ) -> ResponseOutputMessage: """ Create base ResponseOutputMessage object Args: message: Agent API Message content: Converted content list Returns: ResponseOutputMessage: Responses API output message """ # Determine status status = "completed" # Default status if hasattr(message, "status") and message.status: # Map Agent API status to Responses API status if message.status in ["in_progress", "completed", "incomplete"]: status = message.status else: status = "completed" # Other statuses default to completed return ResponseOutputMessage( id=message.id, type="message", role="assistant", content=content, status=status, ) def _convert_content_to_responses_event( self, content_event, ) -> Optional[ResponseStreamEvent]: """ Convert content message type to Responses API stream event Args: content_event: Agent API Content Returns: ResponseStreamEvent or None """ message_id = getattr(content_event, "msg_id", None) if not message_id: return None # Query corresponding message id from temporary storage structure # If message id doesn't exist, indicates abnormal situation, # should not process this content if message_id not in self._message_content_index_map: # Abnormal situation: message corresponding to # content event doesn't exist # This usually indicates message processing order issue, # directly return None return None # Get message information message_info = self._message_content_index_map[message_id] message_type = message_info["message_type"] # plugin calls need special adaptation, streaming not supported if message_type in [ MessageType.PLUGIN_CALL, MessageType.PLUGIN_CALL_OUTPUT, ]: return None content_indexes = message_info["content_index_list"] output_index = message_info["output_index"] # Check if corresponding index already exists # (need to determine index based on content) content_index = content_event.index new_content = content_index not in content_indexes # If not, insert if new_content: content_indexes.append(content_index) # Perform different content adaptation based on message type message_type = message_info["message_type"] if message_type == MessageType.MESSAGE: events = self._convert_message_content_to_responses_event( content_event, new_content, output_index, ) return events if events else None elif message_type == MessageType.FUNCTION_CALL: events = self._convert_function_call_content_to_responses_event( content_event, new_content, output_index, ) return events if events else None elif message_type == MessageType.REASONING: events = self._convert_reasoning_content_to_responses_event( content_event=content_event, output_index=output_index, ) return events if events else None elif message_type == MessageType.ERROR: events = self._convert_error_content_to_responses_event( content_event=content_event, new_content=new_content, ) return events if events else None return None
[docs] def convert_response_to_agent_events( self, response: Response, ) -> List[Event]: """ Convert OpenAI Response object to Agent API Event list Args: response: OpenAI Response object Returns: Agent API Event list """ events = [] # Reset sequence counter self.sequence_counter = 0 # Create response created event response_created = BaseResponse( sequence_number=0, # Will be set uniformly in responses_service object="response", status="created", error=None, ) events.append(response_created) # Process output items for output_item in response.output: if output_item.type == "message": # Convert to Agent API Message and Content events message_events = self._convert_output_message_to_events( output_item, ) events.extend(message_events) # Create response completed event response_completed = BaseResponse( sequence_number=0, # Will be set uniformly in responses_service object="response", status="completed", error=None, ) events.append(response_completed) return events
def _convert_output_message_to_events(self, output_message) -> List[Event]: """Convert OutputMessage to Agent API events""" events = [] # Create message in progress event message_id = f"msg_{uuid.uuid4().hex[:8]}" message_event = Message( sequence_number=0, # Will be set uniformly in responses_service object="message", status="in_progress", error=None, id=message_id, type=MessageType.MESSAGE, role=Role.ASSISTANT, ) events.append(message_event) # Process content items for content_item in output_message.content: if content_item.type == "output_text": # Create text content events text_content = TextContent( sequence_number=0, # Will be set uniformly in responses_service object="content", status="completed", error=None, type=ContentType.TEXT, msg_id=message_id, delta=False, text=content_item.text, ) events.append(text_content) # Create message completed event message_completed = Message( sequence_number=0, # Will be set uniformly in responses_service object="message", status="completed", error=None, id=message_id, type=MessageType.MESSAGE, role=Role.ASSISTANT, ) events.append(message_completed) return events def _convert_custom_tool_call_to_message(self, tool_call_item) -> Message: """Convert Custom tool call to Agent API Message""" # Extract tool call attributes if isinstance(tool_call_item, dict): call_id = tool_call_item.get("call_id", "") name = tool_call_item.get("name", "") input_data = tool_call_item.get("input", "") tool_id = tool_call_item.get("id", "") else: call_id = getattr(tool_call_item, "call_id", "") name = getattr(tool_call_item, "name", "") input_data = getattr(tool_call_item, "input", "") tool_id = getattr(tool_call_item, "id", "") # Create DataContent containing tool call data tool_call_data = { "call_id": call_id, "name": name, "input": input_data, "id": tool_id, } data_content = DataContent( type=ContentType.DATA, data=tool_call_data, delta=False, ) # Create Message message = Message(type=MessageType.PLUGIN_CALL, content=[data_content]) return message def _convert_custom_tool_call_output_to_message( self, tool_output_item, ) -> Message: """Convert Custom tool call output to Agent API Message""" # Extract tool call output attributes if isinstance(tool_output_item, dict): call_id = tool_output_item.get("call_id", "") output = tool_output_item.get("output", "") output_id = tool_output_item.get("id", "") else: call_id = getattr(tool_output_item, "call_id", "") output = getattr(tool_output_item, "output", "") output_id = getattr(tool_output_item, "id", "") # Create DataContent containing tool call output data tool_output_data = { "call_id": call_id, "output": output, "id": output_id, } data_content = DataContent( type=ContentType.DATA, data=tool_output_data, delta=False, ) # Create Message message = Message( type=MessageType.PLUGIN_CALL_OUTPUT, content=[data_content], ) return message def _convert_function_call_to_message(self, function_call_item) -> Message: """Convert Function tool call to Agent API Message""" # Extract function call attributes if isinstance(function_call_item, dict): name = function_call_item.get("name", "") arguments = function_call_item.get("arguments", "") call_id = function_call_item.get("call_id", "") else: name = getattr(function_call_item, "name", "") arguments = getattr(function_call_item, "arguments", "") call_id = getattr(function_call_item, "call_id", "") # Create DataContent containing function call data function_call_data = ToolCall.model_validate( { "name": name, "arguments": arguments, "call_id": call_id, }, ).model_dump() data_content = DataContent( type=ContentType.DATA, data=function_call_data, ) # Create Message message = Message( type=MessageType.FUNCTION_CALL, content=[data_content], ) return message def _convert_function_call_output_to_message( self, function_output_item, ) -> Message: """Convert Function tool call output to Agent API Message""" # Extract function call output attributes if isinstance(function_output_item, dict): call_id = function_output_item.get("call_id", "") output = function_output_item.get("output", "") else: call_id = getattr(function_output_item, "call_id", "") output = getattr(function_output_item, "output", "") # Create DataContent containing function call output data function_output_data = ToolCallOutput.model_validate( { "call_id": call_id, "output": output, }, ).model_dump() data_content = DataContent( type=ContentType.DATA, data=function_output_data, ) # Create Message message = Message( type=MessageType.FUNCTION_CALL_OUTPUT, content=[data_content], ) return message # ===== Content adaptation methods ===== def _convert_message_content_to_responses_event( self, content_event, new_content: bool, output_index: int = 0, ) -> Optional[ResponseStreamEvent]: """ Convert MESSAGE type content to Responses API events Args: content_event: Agent API Content event new_content: whether it is new content Returns: ResponseStreamEvent or None """ events = [] # Determine event type based on content type content_type = getattr(content_event, "type", None) content_status = getattr(content_event, "status", None) if content_type == ContentType.TEXT: # If content is new, generate a response.content_part.added event if new_content: content_add_event = self._create_content_part_added_event( content_event, output_index, ) events.append(content_add_event) # If content is completed, generate a # response.content_part.done event if content_status == "completed": output_text_done_event = self._create_output_text_done_event( content_event, output_index, ) events.append(output_text_done_event) content_done_event = self._create_content_part_done_event( content_event, output_index, ) events.append(content_done_event) if content_status == "in_progress": content_in_progress_event = self._create_text_delta_event( content_event, output_index, ) events.append(content_in_progress_event) if content_type == ContentType.REFUSAL: if new_content: content_add_event = self._create_content_part_added_event( content_event, output_index, ) events.append(content_add_event) if content_status == "completed": output_text_done_event = self._create_refusal_text_done_event( content_event, output_index, ) events.append(output_text_done_event) content_done_event = self._create_content_part_done_event( content_event, output_index, ) events.append(content_done_event) if content_status == "in_progress": content_in_progress_event = ( self._create_refusal_text_delta_event( content_event, output_index, ) ) events.append(content_in_progress_event) return events def _convert_function_call_content_to_responses_event( self, content_event, new_content: bool, output_index: int = 0, ) -> Optional[ResponseStreamEvent]: """ Convert FUNCTION_CALL type content to Responses API events Args: content_event: Agent API Content event new_content: whether it is new content Returns: ResponseStreamEvent or None """ events = [] # Get content type and status content_type = getattr(content_event, "type", None) content_status = getattr(content_event, "status", None) if content_type == ContentType.DATA: if new_content: add_event = ( self._create_function_call_arguments_add_output_item_event( content_event=content_event, output_index=output_index, ) ) events.append(add_event) # Extract function call information from data function = getattr(content_event, "data", {}) if isinstance(function, dict): arguments = function.get("arguments", "") # Generate function_call_arguments.delta event if content_status == "in_progress": delta_event = ( self._create_function_call_arguments_delta_event( content_event, arguments, output_index, ) ) events.append(delta_event) if content_status == "completed": done_event = ( self._create_function_call_arguments_done_event( content_event, arguments, output_index, ) ) events.append(done_event) return events if events else None def _convert_reasoning_content_to_responses_event( self, content_event, output_index: int = 0, ) -> Optional[ResponseStreamEvent]: """ Convert REASONING type content to Responses API events Args: content_event: Agent API Content event new_content: whether it is new content output_index: output index Returns: ResponseStreamEvent or None """ events = [] # Get content type and status content_type = getattr(content_event, "type", None) content_status = getattr(content_event, "status", None) if content_type == ContentType.TEXT: # Extract reasoning content from text reasoning_text = getattr(content_event, "text", "") # Generate reasoning_text.delta event if content_status == "in_progress": delta_event = self._create_reasoning_text_delta_event( content_event, reasoning_text, output_index, ) events.append(delta_event) if content_status == "completed": # First generate delta event, then generate done event delta_event = self._create_reasoning_text_delta_event( content_event, reasoning_text, output_index, ) events.append(delta_event) done_event = self._create_reasoning_text_done_event( content_event, reasoning_text, output_index, ) events.append(done_event) return events if events else None def _convert_error_content_to_responses_event( self, content_event, new_content: bool, ) -> Optional[ResponseStreamEvent]: """ Convert ERROR type content to Responses API events Args: content_event: Agent API Content event new_content: whether it is new content Returns: ResponseStreamEvent or None """ events = [] # Get content type and status content_type = getattr(content_event, "type", None) content_status = getattr(content_event, "status", None) if content_type == ContentType.TEXT: # Extract error message from text error_text = getattr(content_event, "text", "") if new_content and content_status == "completed": # Generate error event error_event = self._create_error_event( error_message=error_text, ) events.append(error_event) elif content_type == ContentType.DATA: # Extract error information from data data = getattr(content_event, "data", {}) if isinstance(data, dict): error_message = data.get("message", str(data)) if new_content and content_status == "completed": # Generate error event error_event = self._create_error_event( error_message=error_message, ) events.append(error_event) return events if events else None def _create_content_part_added_event( self, content_event: Content, output_index: int = 0, ) -> ResponseStreamEvent: """ Create response.content_part.added event Args: content_event: Agent API Content event Returns: ResponseStreamEvent: Responses API event """ # Create corresponding part based on content type content_type = getattr(content_event, "type", None) if content_type == ContentType.TEXT: part = ResponseOutputText( type="output_text", text="", annotations=[], ) elif content_type == ContentType.REFUSAL: part = ResponseOutputRefusal(type="refusal", refusal="") else: # Default to text type part = ResponseOutputText( type="output_text", text=getattr(content_event, "text", ""), annotations=[], ) # Generate response.content_part.added structure # sequence_number will be set uniformly in responses_service return ResponseContentPartAddedEvent( type="response.content_part.added", content_index=content_event.index, item_id=content_event.msg_id, output_index=output_index, part=part, sequence_number=0, ) # Will be set uniformly in responses_service def _create_content_part_done_event( self, content_event: Content, output_index: int = 0, ) -> ResponseStreamEvent: """ Create response.content_part.done event Args: content_event: Agent API Content event Returns: ResponseStreamEvent: Responses API event """ # Create corresponding part based on content type content_type = getattr(content_event, "type", None) if content_type == ContentType.TEXT: part = ResponseOutputText( type="output_text", text=getattr(content_event, "text", ""), annotations=[], ) elif content_type == ContentType.REFUSAL: part = ResponseOutputRefusal( type="refusal", refusal=getattr( content_event, "refusal", "", ), ) else: # Default to text type part = ResponseOutputText( type="output_text", text=getattr(content_event, "text", ""), annotations=[], ) # Generate response.content_part.done structure # sequence_number will be set uniformly in responses_service return ResponseContentPartDoneEvent( type="response.content_part.done", content_index=content_event.index, item_id=content_event.msg_id, output_index=output_index, part=part, sequence_number=0, ) # Will be set uniformly in responses_service def _create_output_text_done_event( self, content_event: Content, output_index: int = 0, ) -> ResponseStreamEvent: """ Create response.output_text.done event Args: content_event: Agent API Content event Returns: ResponseStreamEvent: Responses API event """ # Generate response.output_text.done structure # sequence_number will be set uniformly in responses_service return ResponseTextDoneEvent( type="response.output_text.done", content_index=content_event.index, item_id=content_event.msg_id, output_index=output_index, text=getattr(content_event, "text", ""), logprobs=[], # Temporarily use empty list, can add logprobs support later sequence_number=0, ) # Will be set uniformly in responses_service def _create_text_delta_event( self, content_event: Content, output_index: int = 0, ) -> ResponseStreamEvent: """ Create response.output_text.delta event Args: content_event: Agent API Content event Returns: ResponseStreamEvent: Responses API event """ # Generate response.output_text.delta structure # sequence_number will be set uniformly in responses_service return ResponseTextDeltaEvent( type="response.output_text.delta", content_index=content_event.index, item_id=content_event.msg_id, output_index=output_index, delta=getattr(content_event, "text", ""), logprobs=[], # Temporarily use empty list, can add logprobs support later sequence_number=0, ) # Will be set uniformly in responses_service def _create_refusal_text_done_event( self, content_event: Content, output_index: int = 0, ) -> ResponseStreamEvent: """ Create response.refusal.done event Args: content_event: Agent API Content event Returns: ResponseStreamEvent: Responses API event """ # Generate response.refusal.done structure # sequence_number will be set uniformly in responses_service return ResponseRefusalDoneEvent( type="response.refusal.done", content_index=content_event.index, item_id=content_event.msg_id, output_index=output_index, refusal=getattr(content_event, "refusal", ""), sequence_number=0, ) # Will be set uniformly in responses_service def _create_refusal_text_delta_event( self, content_event: Content, output_index: int = 0, ) -> ResponseStreamEvent: """ Create response.refusal.delta event Args: content_event: Agent API Content event Returns: ResponseStreamEvent: Responses API event """ # Generate response.refusal.delta structure # sequence_number will be set uniformly in responses_service return ResponseRefusalDeltaEvent( type="response.refusal.delta", content_index=content_event.index, item_id=content_event.msg_id, output_index=output_index, delta=getattr(content_event, "refusal", ""), sequence_number=0, ) # Will be set uniformly in responses_service def _next_sequence(self) -> int: """Get next sequence number""" current = self.sequence_counter # sequence_number will be set uniformly in responses_service return current # ===== New event creation methods ===== def _create_function_call_arguments_add_output_item_event( self, content_event, output_index: int = 0, ) -> ResponseStreamEvent: """ Create function call corresponding response.output_item.added event Args: content_event: Agent API Content event arguments: function call parameters output_index: output index Returns: ResponseStreamEvent: Responses API event """ # Convert function call data function_call_data = {} if content_event: if content_event.type == ContentType.DATA: function_call_data = content_event.data if not isinstance(function_call_data, dict): function_call_data = {} # Create ResponseFunctionToolCall function_tool_call = ResponseFunctionToolCall( type="function_call", name=function_call_data.get("name", ""), arguments="", call_id=function_call_data.get("call_id", ""), status=content_event.status, ) return ResponseOutputItemAddedEvent( type="response.output_item.added", item=function_tool_call, output_index=output_index, sequence_number=0, ) # Will be set uniformly in responses_service def _create_function_call_arguments_delta_event( self, content_event, arguments: str, output_index: int = 0, ) -> ResponseStreamEvent: """ Create response.function_call_arguments.delta event Args: content_event: Agent API Content event arguments: function call parameters output_index: output index Returns: ResponseStreamEvent: Responses API event """ # sequence_number will be set uniformly in responses_service return ResponseFunctionCallArgumentsDeltaEvent( type="response.function_call_arguments.delta", delta=arguments, item_id=content_event.msg_id, output_index=output_index, sequence_number=0, ) # Will be set uniformly in responses_service def _create_function_call_arguments_done_event( self, content_event, arguments: str, output_index: int = 0, ) -> ResponseStreamEvent: """ Create response.function_call_arguments.done event Args: content_event: Agent API Content event arguments: function call parameters output_index: output index Returns: ResponseStreamEvent: Responses API event """ # sequence_number will be set uniformly in responses_service return ResponseFunctionCallArgumentsDoneEvent( type="response.function_call_arguments.done", arguments=arguments, item_id=content_event.msg_id, output_index=output_index, sequence_number=0, ) # Will be set uniformly in responses_service def _create_reasoning_text_delta_event( self, content_event, text: str, output_index: int = 0, ) -> ResponseStreamEvent: """ Create response.reasoning_text.delta event Args: content_event: Agent API Content event text: reasoning text content output_index: output index Returns: ResponseStreamEvent: Responses API event """ # sequence_number will be set uniformly in responses_service return ResponseReasoningTextDeltaEvent( type="response.reasoning_text.delta", content_index=content_event.index, delta=text, item_id=content_event.msg_id, output_index=output_index, sequence_number=0, ) # Will be set uniformly in responses_service def _create_reasoning_text_done_event( self, content_event, text: str, output_index: int = 0, ) -> ResponseStreamEvent: """ Create response.reasoning_text.done event Args: content_event: Agent API Content event text: reasoning text content output_index: output index Returns: ResponseStreamEvent: Responses API event """ # sequence_number will be set uniformly in responses_service return ResponseReasoningTextDoneEvent( type="response.reasoning_text.done", content_index=content_event.index, text=text, item_id=content_event.msg_id, output_index=output_index, sequence_number=0, ) # Will be set uniformly in responses_service def _convert_agent_error_to_responses_error( self, agent_error, ) -> Optional[Any]: """ Convert Agent API Error to Responses API ResponseError Args: agent_error: Agent API Error object Returns: ResponseError: Responses API ResponseError or None """ if not agent_error: return None try: # Extract error information from Agent API Error object error_code = getattr(agent_error, "code", "server_error") error_message = getattr( agent_error, "message", "Unknown error occurred", ) # Map Agent API error code to Responses API error code # If Agent API code is not in Responses API allowed range, # use server_error as default valid_codes = [ "server_error", "rate_limit_exceeded", "invalid_prompt", "vector_store_timeout", "invalid_image", "invalid_image_format", "invalid_base64_image", "invalid_image_url", "image_too_large", "image_too_small", "image_parse_error", "image_content_policy_violation", "invalid_image_mode", "image_file_too_large", "unsupported_image_media_type", "empty_image_file", "failed_to_download_image", "image_file_not_found", ] # If Agent API code is in valid range, use directly; # otherwise use server_error mapped_code = ( error_code if error_code in valid_codes else "server_error" ) # Create Responses API ResponseError from openai.types.responses import ResponseError return ResponseError(code=mapped_code, message=error_message) except Exception as e: # If conversion fails, log error and return None print(f"Error converting agent error to responses error: {e}") return None def _create_error_event( self, error_message: str, ) -> ResponseStreamEvent: """ Create error event Args: content_event: Agent API Content event error_message: error message output_index: output index Returns: ResponseStreamEvent: Responses API event """ # sequence_number will be set uniformly in responses_service return ResponseErrorEvent( type="error", message=error_message, sequence_number=0, ) # Will be set uniformly in responses_service def _create_function_call_item_added_event( self, content_event, output_index: int = 0, ) -> ResponseStreamEvent: """ Create function_call type response.output_item.added event Args: content_event: Agent API Content event output_index: output index Returns: ResponseStreamEvent: Responses API event """ # Extract function call information from data data = getattr(content_event, "data", {}) name = data.get("name", "") arguments = data.get("arguments", "") call_id = data.get("call_id", "") # Create ResponseFunctionToolCall as item item = ResponseFunctionToolCall( type="function_call", name=name, arguments=arguments, call_id=call_id, id=data.get("id"), status=data.get("status"), ) # Generate response.output_item.added structure # sequence_number will be set uniformly in responses_service return ResponseOutputItemAddedEvent( type="response.output_item.added", item=item, output_index=output_index, sequence_number=0, ) # Will be set uniformly in responses_service def _create_reasoning_item_added_event( self, content_event, output_index: int = 0, ) -> ResponseStreamEvent: """ Create reasoning type response.output_item.added event Args: content_event: Agent API Content event output_index: output index Returns: ResponseStreamEvent: Responses API event """ # Extract reasoning content from text reasoning_text = getattr(content_event, "text", "") # Create ResponseReasoningItem as item item = ResponseReasoningItem( type="reasoning", id=content_event.msg_id, summary=[], # Empty summary content=( [Content(type="reasoning_text", text=reasoning_text)] if reasoning_text else None ), encrypted_content=None, status=None, ) # Generate response.output_item.added structure # sequence_number will be set uniformly in responses_service return ResponseOutputItemAddedEvent( type="response.output_item.added", item=item, output_index=output_index, sequence_number=0, ) # Will be set uniformly in responses_service def _convert_mcp_list_tools_to_output_message( self, message: Message, ) -> McpListTools: """ Convert MCP tool list message to McpListTools Args: message: Agent API Message (type='mcp_list_tools') Returns: McpListTools: Responses API MCP tool list """ # Convert MCP tool list data mcp_data = {} if message.content: for content_item in message.content: if content_item.type == ContentType.DATA: mcp_data = content_item.data break if not isinstance(mcp_data, dict): mcp_data = {} # Extract tool list information tools_info = mcp_data.get("tools", []) tools = [] for tool_info in tools_info: if isinstance(tool_info, dict): tool = McpListToolsTool( name=tool_info.get("name", "unknown"), input_schema=tool_info.get("input_schema", {}), description=tool_info.get("description", ""), annotations=tool_info.get("annotations"), ) tools.append(tool) # Create McpListTools return McpListTools( id=message.id, server_label=mcp_data.get("server_label", "MCP Server"), tools=tools, type="mcp_list_tools", ) def _convert_mcp_tool_call_to_output_message( self, message: Message, ) -> McpCall: """ Convert MCP tool call message to McpCall Args: message: Agent API Message (type='mcp_call') Returns: McpCall: Responses API MCP tool call """ # Convert MCP tool call data mcp_call_data = {} if message.content: for content_item in message.content: if content_item.type == ContentType.DATA: mcp_call_data = content_item.data break if not isinstance(mcp_call_data, dict): mcp_call_data = {} # Extract MCP tool call information tool_name = mcp_call_data.get("name", "mcp_tool") tool_arguments = mcp_call_data.get("arguments", "{}") server_label = mcp_call_data.get("server_label", "MCP Server") # Create McpCall return McpCall( id=message.id, name=tool_name, arguments=tool_arguments, server_label=server_label, type="mcp_call", error=mcp_call_data.get("error"), output=mcp_call_data.get("output"), ) def _create_mcp_list_tools_item_added_event( self, content_event, output_index: int = 0, ) -> ResponseStreamEvent: """ Create MCP tool list item added event Args: content_event: Agent API Content event output_index: output index Returns: ResponseStreamEvent: Responses API event """ # Extract MCP tool list information from data data = getattr(content_event, "data", {}) if not isinstance(data, dict): data = {} # Extract tool list information tools_info = data.get("tools", []) tools = [] for tool_info in tools_info: if isinstance(tool_info, dict): tool = McpListToolsTool( name=tool_info.get("name", "unknown"), input_schema=tool_info.get("input_schema", {}), description=tool_info.get("description", ""), annotations=tool_info.get("annotations"), ) tools.append(tool) # Create McpListTools as item item = McpListTools( id=content_event.msg_id, server_label=data.get("server_label", "MCP Server"), tools=tools, type="mcp_list_tools", ) # Generate response.output_item.added structure # sequence_number will be set uniformly in responses_service return ResponseOutputItemAddedEvent( type="response.output_item.added", item=item, output_index=output_index, sequence_number=0, ) # Will be set uniformly in responses_service def _create_mcp_tool_call_item_added_event( self, content_event, output_index: int = 0, ) -> ResponseStreamEvent: """ Create MCP tool call item added event Args: content_event: Agent API Content event output_index: output index Returns: ResponseStreamEvent: Responses API event """ # Extract MCP tool call information from data data = getattr(content_event, "data", {}) if not isinstance(data, dict): data = {} # Extract MCP tool call information tool_name = data.get("name", "mcp_tool") tool_arguments = data.get("arguments", "{}") server_label = data.get("server_label", "MCP Server") # Create McpCall as item item = McpCall( id=content_event.msg_id, name=tool_name, arguments=tool_arguments, server_label=server_label, type="mcp_call", error=data.get("error"), output=data.get("output"), ) # Generate response.output_item.added structure # sequence_number will be set uniformly in responses_service return ResponseOutputItemAddedEvent( type="response.output_item.added", item=item, output_index=output_index, sequence_number=0, ) # Will be set uniformly in responses_service def _create_mcp_list_tools_in_progress_event( self, message_event: Message, output_index: int = 0, ) -> ResponseStreamEvent: """ Create MCP tool list in_progress event Args: message_event: Agent API Message event output_index: output index Returns: ResponseStreamEvent: Responses API event """ # Generate response.mcp_list_tools.in_progress event # sequence_number will be set uniformly in responses_service return ResponseMcpListToolsInProgressEvent( type="response.mcp_list_tools.in_progress", item_id=message_event.id, output_index=output_index, sequence_number=0, ) # Will be set uniformly in responses_service def _create_mcp_list_tools_completed_event( self, message_event: Message, output_index: int = 0, ) -> List[ResponseStreamEvent]: """ Create MCP tool list completed event Args: message_event: Agent API Message event output_index: output index Returns: List[ResponseStreamEvent]: Responses API event list """ events = [] # 1. Generate response.mcp_list_tools.completed event mcp_completed_event = ResponseMcpListToolsCompletedEvent( type="response.mcp_list_tools.completed", item_id=message_event.id, output_index=output_index, sequence_number=0, ) # Will be set uniformly in responses_service events.append(mcp_completed_event) # 2. Generate response.output_item.done event output_message = self._convert_mcp_list_tools_to_output_message( message_event, ) if output_message: output_item_done_event = ResponseOutputItemDoneEvent( type="response.output_item.done", item=output_message, output_index=output_index, sequence_number=0, ) # Will be set uniformly in responses_service events.append(output_item_done_event) # Add to _output list self._output.append(output_message) return events def _create_mcp_tool_call_in_progress_event( self, message_event: Message, output_index: int = 0, ) -> ResponseStreamEvent: """ Create MCP tool call in_progress event Args: message_event: Agent API Message event output_index: output index Returns: ResponseStreamEvent: Responses API event """ # Generate response.mcp_call.in_progress event # sequence_number will be set uniformly in responses_service return ResponseMcpCallInProgressEvent( type="response.mcp_call.in_progress", item_id=message_event.id, output_index=output_index, sequence_number=0, ) # Will be set uniformly in responses_service def _create_mcp_tool_call_completed_event( self, message_event: Message, output_index: int = 0, ) -> List[ResponseStreamEvent]: """ Create MCP tool call completed event Args: message_event: Agent API Message event output_index: output index Returns: List[ResponseStreamEvent]: Responses API event list """ events = [] # 1. Generate response.mcp_call.completed event mcp_completed_event = ResponseMcpCallCompletedEvent( type="response.mcp_call.completed", item_id=message_event.id, output_index=output_index, sequence_number=0, ) # Will be set uniformly in responses_service events.append(mcp_completed_event) # 2. Generate response.output_item.done event output_message = self._convert_mcp_tool_call_to_output_message( message_event, ) if output_message: output_item_done_event = ResponseOutputItemDoneEvent( type="response.output_item.done", item=output_message, output_index=output_index, sequence_number=0, ) # Will be set uniformly in responses_service events.append(output_item_done_event) # Add to _output list self._output.append(output_message) return events
# Export main adapter class __all__ = ["ResponsesAdapter"]