# -*- coding: utf-8 -*-
from typing import List, Optional, Union
from openai.types.chat import ChatCompletionChunk
from openai.types.chat.chat_completion_chunk import ChoiceDeltaToolCall
from agentscope_runtime.engine.schemas.agent_schemas import (
Role,
FunctionCall,
AgentResponse,
RunStatus,
Message,
TextContent,
)
# Use OpenAI's ToolCall type instead of agentscope_runtime
ToolCall = ChoiceDeltaToolCall
# TODO: add this for streaming structured output support later
[docs]
def merge_incremental_chunk( # pylint: disable=too-many-branches,too-many-nested-blocks # noqa: E501
responses: List[ChatCompletionChunk],
) -> Optional[ChatCompletionChunk]:
"""
Merge an incremental chunk list to a ChatCompletionChunk.
Args:
responses (List[ChatCompletionChunk]): List of incremental chat
completion chunks to merge into a single response.
Returns:
Optional[ChatCompletionChunk]: The merged chat completion chunk,
or None if the input list is empty.
"""
if len(responses) == 0:
return None
if not isinstance(responses[0], ChatCompletionChunk):
return None
# get usage or finish reason
merged = ChatCompletionChunk(**responses[-1].__dict__)
# if the responses has usage info, then merge the finish reason chunk to
# usage chunk
if not merged.choices and len(responses) > 1:
merged.choices = responses[-2].choices
# might be multiple tool calls result
tool_calls_dict = {}
for resp in reversed(responses[:-1]):
for i, j in zip(merged.choices, resp.choices):
# jump the finish reason chunk
if (i.delta.content is None and j.delta.content is not None) and (
i.delta.tool_calls is None and j.delta.tool_calls is not None
):
continue
if j.delta.role == Role.TOOL:
continue
# merge content
if not i.delta.content and isinstance(j.delta.content, str):
i.delta.content = j.delta.content
elif isinstance(i.delta.content, str) and isinstance(
j.delta.content,
str,
):
i.delta.content = j.delta.content + i.delta.content
# merge tool calls
elif not i.delta.tool_calls and isinstance(
j.delta.tool_calls,
list,
):
for tool_call in j.delta.tool_calls:
if tool_call.index not in tool_calls_dict:
tool_calls_dict[tool_call.index] = tool_call
# make sure function.arguments is a string
if not tool_call.function.arguments:
tool_calls_dict[
tool_call.index
].function.arguments = ""
else:
if tool_call.id != "":
tool_calls_dict[tool_call.index].id = tool_call.id
if tool_call.function.name:
tool_calls_dict[
tool_call.index
].function.name = tool_call.function.name
if (
tool_call.function.arguments
and not tool_calls_dict[
tool_call.index
].function.arguments.startswith("{")
):
tool_calls_dict[
tool_call.index
].function.arguments = (
tool_call.function.arguments
+ tool_calls_dict[
tool_call.index
].function.arguments
)
if merged.usage and resp.usage:
merged.usage.prompt_tokens += resp.usage.prompt_tokens
merged.usage.completion_tokens += resp.usage.completion_tokens
merged.usage.total_tokens += resp.usage.total_tokens
if tool_calls_dict:
merged.choices[0].delta.tool_calls = [
ToolCall(
id=tool_call.id,
type=tool_call.type,
function=FunctionCall(**tool_call.function.__dict__),
)
for tool_call in tool_calls_dict.values()
]
return merged
[docs]
def get_finish_reason(response: ChatCompletionChunk) -> Optional[str]:
finish_reason = None
if not isinstance(response, ChatCompletionChunk):
return finish_reason
if response.choices:
if response.choices[0].finish_reason:
finish_reason = response.choices[0].finish_reason
return finish_reason
[docs]
def merge_agent_response( # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements,too-many-nested-blocks # noqa: E501
responses: List[Union[AgentResponse, Message, TextContent]],
) -> AgentResponse:
"""
Merge a list of incremental response objects into a single AgentResponse.
Args:
responses (List[Union[AgentResponse, Message, TextContent]]):
List of incremental responses to merge into a single response.
Returns:
AgentResponse: The merged agent response.
"""
if len(responses) == 0:
raise ValueError("Cannot merge empty response list")
# Check if all responses are of the same object type
object_types = set()
for resp in responses:
if hasattr(resp, "object"):
object_types.add(resp.object)
else:
# If no object field, treat as AgentResponse
object_types.add("response")
if len(object_types) > 1:
# Mixed object types, convert the last response to AgentResponse
last_resp = responses[-1]
if isinstance(last_resp, TextContent):
# Convert TextContent to Message, then to AgentResponse
message = Message(
role=Role.ASSISTANT,
content=[last_resp],
status=last_resp.status or RunStatus.Completed,
)
return AgentResponse(
output=[message],
status=message.status,
session_id=None,
)
elif isinstance(last_resp, Message):
return AgentResponse(
output=[last_resp],
status=last_resp.status,
session_id=None,
)
else:
return AgentResponse(**last_resp.__dict__)
object_type = list(object_types)[0]
if object_type == "content":
# For content objects, merge text content and convert to AgentResponse
text_contents = [
resp for resp in responses if hasattr(resp, "text") and resp.text
]
if not text_contents:
# Return empty AgentResponse if no text content
return AgentResponse(
status=RunStatus.Completed,
session_id=None,
)
# Merge all text content
merged_text = ""
last_content = text_contents[-1]
for content in text_contents:
if content.delta:
merged_text += content.text
else:
merged_text = content.text
# Create a Message with merged content
final_content = TextContent(
text=merged_text,
delta=False,
index=0,
msg_id=last_content.msg_id,
status=RunStatus.Completed,
)
message = Message(
role=Role.ASSISTANT,
content=[final_content],
status=RunStatus.Completed,
)
return AgentResponse(
output=[message],
status=RunStatus.Completed,
session_id=None,
)
elif object_type == "message":
# For message objects, convert to AgentResponse
messages = [resp for resp in responses if isinstance(resp, Message)]
if not messages:
return AgentResponse(
status=RunStatus.Completed,
session_id=None,
)
# Return the last message as AgentResponse
last_message = messages[-1]
return AgentResponse(
output=[last_message],
status=last_message.status,
session_id=None,
)
else:
# For response objects, use existing logic
# Filter only AgentResponse objects
agent_responses = [
resp for resp in responses if isinstance(resp, AgentResponse)
]
if len(agent_responses) == 0:
last_resp = responses[-1]
if isinstance(last_resp, Message):
return AgentResponse(
output=[last_resp],
status=last_resp.status,
session_id=None,
)
else:
return AgentResponse(**last_resp.__dict__)
# Get the last AgentResponse as base
merged = AgentResponse(**agent_responses[-1].__dict__)
# If no output, return the merged response
if not merged.output:
return merged
# Merge content from all AgentResponse objects
content_dict = {}
for resp in agent_responses:
if not resp.output:
continue
for message in resp.output:
if not message.content:
continue
for content in message.content:
if (
content.type == "text"
and hasattr(content, "text")
and content.text
):
# For text content, accumulate the text
if content.msg_id not in content_dict:
content_dict[content.msg_id] = {
"content": content,
"text": content.text,
"delta": content.delta,
}
else:
# If delta is True, append text; if False, replace
if content.delta:
content_dict[content.msg_id][
"text"
] += content.text
else:
content_dict[content.msg_id][
"text"
] = content.text
# Update the content object with merged text
content_dict[content.msg_id][
"content"
].text = content_dict[content.msg_id]["text"]
content_dict[content.msg_id][
"content"
].delta = False
# Update the merged response with accumulated content
if content_dict:
for message in merged.output:
if message.content:
for content in message.content:
if (
content.type == "text"
and hasattr(content, "msg_id")
and content.msg_id in content_dict
):
content.text = content_dict[content.msg_id]["text"]
content.delta = False
return merged
[docs]
def get_agent_response_finish_reason(
response: Union[AgentResponse, Message, TextContent],
) -> Optional[str]:
"""
Get the finish reason from a response object.
Args:
response (Union[AgentResponse, Message, TextContent]):
The response object
Returns:
Optional[str]: The finish reason, or None if not finished
"""
# Only consider AgentResponse objects as potential final responses
if isinstance(response, AgentResponse):
if (
hasattr(response, "status")
and response.status == RunStatus.Completed
):
# Check if this is a final response with output
if hasattr(response, "output") and response.output:
return "stop"
return None
[docs]
def merge_agent_message( # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements,too-many-nested-blocks # noqa: E501
messages: List[Union[Message, TextContent]],
) -> Message:
"""
Merge a list of incremental message objects into a single Message.
Args:
messages (List[Union[Message, TextContent]]):
List of incremental messages to merge into a single message.
Returns:
Message: The merged message.
"""
if len(messages) == 0:
raise ValueError("Cannot merge empty message list")
# Check if all messages are of the same object type
object_types = set()
for msg in messages:
if hasattr(msg, "object"):
object_types.add(msg.object)
else:
# If no object field, treat as Message
object_types.add("message")
if len(object_types) > 1:
# Mixed object types, convert the last message to Message
last_msg = messages[-1]
if isinstance(last_msg, TextContent):
# Convert TextContent to Message with delta=False
final_content = TextContent(
text=last_msg.text,
delta=False,
index=last_msg.index,
msg_id=last_msg.msg_id,
status=RunStatus.Completed,
)
return Message(
role=Role.ASSISTANT,
content=[final_content],
status=RunStatus.Completed,
)
else:
return Message(**last_msg.__dict__)
object_type = list(object_types)[0]
if object_type == "content":
# For content objects, merge text content and convert to Message
text_contents = [
msg for msg in messages if hasattr(msg, "text") and msg.text
]
if not text_contents:
# Return empty Message if no text content
return Message(
role=Role.ASSISTANT,
status=RunStatus.Completed,
)
# Merge all text content
merged_text = ""
last_content = text_contents[-1]
for content in text_contents:
if content.delta:
merged_text += content.text
else:
merged_text = content.text
# Create a Message with merged content
final_content = TextContent(
text=merged_text,
delta=False,
index=0,
msg_id=last_content.msg_id,
status=RunStatus.Completed,
)
return Message(
role=Role.ASSISTANT,
content=[final_content],
status=RunStatus.Completed,
)
else:
# For message objects, use existing logic
# Filter only Message objects
message_objects = [msg for msg in messages if isinstance(msg, Message)]
if len(message_objects) == 0:
last_msg = messages[-1]
if isinstance(last_msg, TextContent):
return Message(
role=Role.ASSISTANT,
content=[last_msg],
status=last_msg.status or RunStatus.Completed,
)
else:
return Message(**last_msg.__dict__)
# Get the last Message as base
merged = Message(**message_objects[-1].__dict__)
# If no content, return the merged message
if not merged.content:
return merged
# Merge content from all Message objects
# Use msg_id + index as key to avoid overwriting different contents
# with the same msg_id but different indexes
content_dict = {}
for msg in message_objects:
if not msg.content:
continue
for content in msg.content:
if (
content.type == "text"
and hasattr(content, "text")
and content.text
):
# Create unique key using msg_id and index
content_key = f"{content.msg_id}_{content.index}"
# For text content, accumulate the text
if content_key not in content_dict:
content_dict[content_key] = {
"content": content,
"text": content.text,
"delta": content.delta,
"msg_id": content.msg_id,
"index": content.index,
}
else:
# If delta is True, append text; if False, replace
if content.delta:
content_dict[content_key]["text"] += content.text
else:
content_dict[content_key]["text"] = content.text
# Update the content object with merged text
content_dict[content_key][
"content"
].text = content_dict[content_key]["text"]
content_dict[content_key]["content"].delta = False
# Update the merged message with accumulated content
if content_dict:
for content in merged.content:
if (
content.type == "text"
and hasattr(content, "msg_id")
and hasattr(content, "index")
):
content_key = f"{content.msg_id}_{content.index}"
if content_key in content_dict:
content.text = content_dict[content_key]["text"]
content.delta = False
return merged
[docs]
def get_agent_message_finish_reason(
message: Optional[Union[Message, TextContent]],
) -> Optional[str]:
if message is None:
return None
if isinstance(message, Message):
return "stop" if message.status == RunStatus.Completed else None
return None