Extract chat completions conversion code into helper (#522)

Small refactor for rest of stack.

---
[//]: # (BEGIN SAPLING FOOTER)
* #524
* #523
* __->__ #522
This commit is contained in:
Rohan Mehta 2025-04-15 18:31:17 -04:00 committed by GitHub
parent ce1abe6006
commit 80de53e879
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 530 additions and 511 deletions

View file

@ -0,0 +1,466 @@
from __future__ import annotations
import json
from collections.abc import Iterable
from typing import Any, Literal, cast
from openai import NOT_GIVEN, NotGiven
from openai.types.chat import (
ChatCompletionAssistantMessageParam,
ChatCompletionContentPartImageParam,
ChatCompletionContentPartParam,
ChatCompletionContentPartTextParam,
ChatCompletionDeveloperMessageParam,
ChatCompletionMessage,
ChatCompletionMessageParam,
ChatCompletionMessageToolCallParam,
ChatCompletionSystemMessageParam,
ChatCompletionToolChoiceOptionParam,
ChatCompletionToolMessageParam,
ChatCompletionUserMessageParam,
)
from openai.types.chat.chat_completion_tool_param import ChatCompletionToolParam
from openai.types.chat.completion_create_params import ResponseFormat
from openai.types.responses import (
EasyInputMessageParam,
ResponseFileSearchToolCallParam,
ResponseFunctionToolCall,
ResponseFunctionToolCallParam,
ResponseInputContentParam,
ResponseInputImageParam,
ResponseInputTextParam,
ResponseOutputMessage,
ResponseOutputMessageParam,
ResponseOutputRefusal,
ResponseOutputText,
)
from openai.types.responses.response_input_param import FunctionCallOutput, ItemReference, Message
from ..agent_output import AgentOutputSchema
from ..exceptions import AgentsException, UserError
from ..handoffs import Handoff
from ..items import TResponseInputItem, TResponseOutputItem
from ..tool import FunctionTool, Tool
from .fake_id import FAKE_RESPONSES_ID
class Converter:
@classmethod
def convert_tool_choice(
cls, tool_choice: Literal["auto", "required", "none"] | str | None
) -> ChatCompletionToolChoiceOptionParam | NotGiven:
if tool_choice is None:
return NOT_GIVEN
elif tool_choice == "auto":
return "auto"
elif tool_choice == "required":
return "required"
elif tool_choice == "none":
return "none"
else:
return {
"type": "function",
"function": {
"name": tool_choice,
},
}
@classmethod
def convert_response_format(
cls, final_output_schema: AgentOutputSchema | None
) -> ResponseFormat | NotGiven:
if not final_output_schema or final_output_schema.is_plain_text():
return NOT_GIVEN
return {
"type": "json_schema",
"json_schema": {
"name": "final_output",
"strict": final_output_schema.strict_json_schema,
"schema": final_output_schema.json_schema(),
},
}
@classmethod
def message_to_output_items(cls, message: ChatCompletionMessage) -> list[TResponseOutputItem]:
items: list[TResponseOutputItem] = []
message_item = ResponseOutputMessage(
id=FAKE_RESPONSES_ID,
content=[],
role="assistant",
type="message",
status="completed",
)
if message.content:
message_item.content.append(
ResponseOutputText(text=message.content, type="output_text", annotations=[])
)
if message.refusal:
message_item.content.append(
ResponseOutputRefusal(refusal=message.refusal, type="refusal")
)
if message.audio:
raise AgentsException("Audio is not currently supported")
if message_item.content:
items.append(message_item)
if message.tool_calls:
for tool_call in message.tool_calls:
items.append(
ResponseFunctionToolCall(
id=FAKE_RESPONSES_ID,
call_id=tool_call.id,
arguments=tool_call.function.arguments,
name=tool_call.function.name,
type="function_call",
)
)
return items
@classmethod
def maybe_easy_input_message(cls, item: Any) -> EasyInputMessageParam | None:
if not isinstance(item, dict):
return None
keys = item.keys()
# EasyInputMessageParam only has these two keys
if keys != {"content", "role"}:
return None
role = item.get("role", None)
if role not in ("user", "assistant", "system", "developer"):
return None
if "content" not in item:
return None
return cast(EasyInputMessageParam, item)
@classmethod
def maybe_input_message(cls, item: Any) -> Message | None:
if (
isinstance(item, dict)
and item.get("type") == "message"
and item.get("role")
in (
"user",
"system",
"developer",
)
):
return cast(Message, item)
return None
@classmethod
def maybe_file_search_call(cls, item: Any) -> ResponseFileSearchToolCallParam | None:
if isinstance(item, dict) and item.get("type") == "file_search_call":
return cast(ResponseFileSearchToolCallParam, item)
return None
@classmethod
def maybe_function_tool_call(cls, item: Any) -> ResponseFunctionToolCallParam | None:
if isinstance(item, dict) and item.get("type") == "function_call":
return cast(ResponseFunctionToolCallParam, item)
return None
@classmethod
def maybe_function_tool_call_output(
cls,
item: Any,
) -> FunctionCallOutput | None:
if isinstance(item, dict) and item.get("type") == "function_call_output":
return cast(FunctionCallOutput, item)
return None
@classmethod
def maybe_item_reference(cls, item: Any) -> ItemReference | None:
if isinstance(item, dict) and item.get("type") == "item_reference":
return cast(ItemReference, item)
return None
@classmethod
def maybe_response_output_message(cls, item: Any) -> ResponseOutputMessageParam | None:
# ResponseOutputMessage is only used for messages with role assistant
if (
isinstance(item, dict)
and item.get("type") == "message"
and item.get("role") == "assistant"
):
return cast(ResponseOutputMessageParam, item)
return None
@classmethod
def extract_text_content(
cls, content: str | Iterable[ResponseInputContentParam]
) -> str | list[ChatCompletionContentPartTextParam]:
all_content = cls.extract_all_content(content)
if isinstance(all_content, str):
return all_content
out: list[ChatCompletionContentPartTextParam] = []
for c in all_content:
if c.get("type") == "text":
out.append(cast(ChatCompletionContentPartTextParam, c))
return out
@classmethod
def extract_all_content(
cls, content: str | Iterable[ResponseInputContentParam]
) -> str | list[ChatCompletionContentPartParam]:
if isinstance(content, str):
return content
out: list[ChatCompletionContentPartParam] = []
for c in content:
if isinstance(c, dict) and c.get("type") == "input_text":
casted_text_param = cast(ResponseInputTextParam, c)
out.append(
ChatCompletionContentPartTextParam(
type="text",
text=casted_text_param["text"],
)
)
elif isinstance(c, dict) and c.get("type") == "input_image":
casted_image_param = cast(ResponseInputImageParam, c)
if "image_url" not in casted_image_param or not casted_image_param["image_url"]:
raise UserError(
f"Only image URLs are supported for input_image {casted_image_param}"
)
out.append(
ChatCompletionContentPartImageParam(
type="image_url",
image_url={
"url": casted_image_param["image_url"],
"detail": casted_image_param["detail"],
},
)
)
elif isinstance(c, dict) and c.get("type") == "input_file":
raise UserError(f"File uploads are not supported for chat completions {c}")
else:
raise UserError(f"Unknown content: {c}")
return out
@classmethod
def items_to_messages(
cls,
items: str | Iterable[TResponseInputItem],
) -> list[ChatCompletionMessageParam]:
"""
Convert a sequence of 'Item' objects into a list of ChatCompletionMessageParam.
Rules:
- EasyInputMessage or InputMessage (role=user) => ChatCompletionUserMessageParam
- EasyInputMessage or InputMessage (role=system) => ChatCompletionSystemMessageParam
- EasyInputMessage or InputMessage (role=developer) => ChatCompletionDeveloperMessageParam
- InputMessage (role=assistant) => Start or flush a ChatCompletionAssistantMessageParam
- response_output_message => Also produces/flushes a ChatCompletionAssistantMessageParam
- tool calls get attached to the *current* assistant message, or create one if none.
- tool outputs => ChatCompletionToolMessageParam
"""
if isinstance(items, str):
return [
ChatCompletionUserMessageParam(
role="user",
content=items,
)
]
result: list[ChatCompletionMessageParam] = []
current_assistant_msg: ChatCompletionAssistantMessageParam | None = None
def flush_assistant_message() -> None:
nonlocal current_assistant_msg
if current_assistant_msg is not None:
# The API doesn't support empty arrays for tool_calls
if not current_assistant_msg.get("tool_calls"):
del current_assistant_msg["tool_calls"]
result.append(current_assistant_msg)
current_assistant_msg = None
def ensure_assistant_message() -> ChatCompletionAssistantMessageParam:
nonlocal current_assistant_msg
if current_assistant_msg is None:
current_assistant_msg = ChatCompletionAssistantMessageParam(role="assistant")
current_assistant_msg["tool_calls"] = []
return current_assistant_msg
for item in items:
# 1) Check easy input message
if easy_msg := cls.maybe_easy_input_message(item):
role = easy_msg["role"]
content = easy_msg["content"]
if role == "user":
flush_assistant_message()
msg_user: ChatCompletionUserMessageParam = {
"role": "user",
"content": cls.extract_all_content(content),
}
result.append(msg_user)
elif role == "system":
flush_assistant_message()
msg_system: ChatCompletionSystemMessageParam = {
"role": "system",
"content": cls.extract_text_content(content),
}
result.append(msg_system)
elif role == "developer":
flush_assistant_message()
msg_developer: ChatCompletionDeveloperMessageParam = {
"role": "developer",
"content": cls.extract_text_content(content),
}
result.append(msg_developer)
elif role == "assistant":
flush_assistant_message()
msg_assistant: ChatCompletionAssistantMessageParam = {
"role": "assistant",
"content": cls.extract_text_content(content),
}
result.append(msg_assistant)
else:
raise UserError(f"Unexpected role in easy_input_message: {role}")
# 2) Check input message
elif in_msg := cls.maybe_input_message(item):
role = in_msg["role"]
content = in_msg["content"]
flush_assistant_message()
if role == "user":
msg_user = {
"role": "user",
"content": cls.extract_all_content(content),
}
result.append(msg_user)
elif role == "system":
msg_system = {
"role": "system",
"content": cls.extract_text_content(content),
}
result.append(msg_system)
elif role == "developer":
msg_developer = {
"role": "developer",
"content": cls.extract_text_content(content),
}
result.append(msg_developer)
else:
raise UserError(f"Unexpected role in input_message: {role}")
# 3) response output message => assistant
elif resp_msg := cls.maybe_response_output_message(item):
flush_assistant_message()
new_asst = ChatCompletionAssistantMessageParam(role="assistant")
contents = resp_msg["content"]
text_segments = []
for c in contents:
if c["type"] == "output_text":
text_segments.append(c["text"])
elif c["type"] == "refusal":
new_asst["refusal"] = c["refusal"]
elif c["type"] == "output_audio":
# Can't handle this, b/c chat completions expects an ID which we dont have
raise UserError(
f"Only audio IDs are supported for chat completions, but got: {c}"
)
else:
raise UserError(f"Unknown content type in ResponseOutputMessage: {c}")
if text_segments:
combined = "\n".join(text_segments)
new_asst["content"] = combined
new_asst["tool_calls"] = []
current_assistant_msg = new_asst
# 4) function/file-search calls => attach to assistant
elif file_search := cls.maybe_file_search_call(item):
asst = ensure_assistant_message()
tool_calls = list(asst.get("tool_calls", []))
new_tool_call = ChatCompletionMessageToolCallParam(
id=file_search["id"],
type="function",
function={
"name": "file_search_call",
"arguments": json.dumps(
{
"queries": file_search.get("queries", []),
"status": file_search.get("status"),
}
),
},
)
tool_calls.append(new_tool_call)
asst["tool_calls"] = tool_calls
elif func_call := cls.maybe_function_tool_call(item):
asst = ensure_assistant_message()
tool_calls = list(asst.get("tool_calls", []))
arguments = func_call["arguments"] if func_call["arguments"] else "{}"
new_tool_call = ChatCompletionMessageToolCallParam(
id=func_call["call_id"],
type="function",
function={
"name": func_call["name"],
"arguments": arguments,
},
)
tool_calls.append(new_tool_call)
asst["tool_calls"] = tool_calls
# 5) function call output => tool message
elif func_output := cls.maybe_function_tool_call_output(item):
flush_assistant_message()
msg: ChatCompletionToolMessageParam = {
"role": "tool",
"tool_call_id": func_output["call_id"],
"content": func_output["output"],
}
result.append(msg)
# 6) item reference => handle or raise
elif item_ref := cls.maybe_item_reference(item):
raise UserError(
f"Encountered an item_reference, which is not supported: {item_ref}"
)
# 7) If we haven't recognized it => fail or ignore
else:
raise UserError(f"Unhandled item type or structure: {item}")
flush_assistant_message()
return result
@classmethod
def tool_to_openai(cls, tool: Tool) -> ChatCompletionToolParam:
if isinstance(tool, FunctionTool):
return {
"type": "function",
"function": {
"name": tool.name,
"description": tool.description or "",
"parameters": tool.params_json_schema,
},
}
raise UserError(
f"Hosted tools are not supported with the ChatCompletions API. Got tool type: "
f"{type(tool)}, tool: {tool}"
)
@classmethod
def convert_handoff_tool(cls, handoff: Handoff[Any]) -> ChatCompletionToolParam:
return {
"type": "function",
"function": {
"name": handoff.tool_name,
"description": handoff.tool_description,
"parameters": handoff.input_json_schema,
},
}

View file

@ -3,71 +3,46 @@ from __future__ import annotations
import dataclasses
import json
import time
from collections.abc import AsyncIterator, Iterable
from collections.abc import AsyncIterator
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Literal, cast, overload
from openai import NOT_GIVEN, AsyncOpenAI, AsyncStream, NotGiven
from openai import NOT_GIVEN, AsyncOpenAI, AsyncStream
from openai.types import ChatModel
from openai.types.chat import (
ChatCompletion,
ChatCompletionAssistantMessageParam,
ChatCompletionChunk,
ChatCompletionContentPartImageParam,
ChatCompletionContentPartParam,
ChatCompletionContentPartTextParam,
ChatCompletionDeveloperMessageParam,
ChatCompletionMessage,
ChatCompletionMessageParam,
ChatCompletionMessageToolCallParam,
ChatCompletionSystemMessageParam,
ChatCompletionToolChoiceOptionParam,
ChatCompletionToolMessageParam,
ChatCompletionUserMessageParam,
)
from openai.types.chat.chat_completion_tool_param import ChatCompletionToolParam
from openai.types.chat.completion_create_params import ResponseFormat
from openai.types.chat import ChatCompletion, ChatCompletionChunk
from openai.types.completion_usage import CompletionUsage
from openai.types.responses import (
EasyInputMessageParam,
Response,
ResponseCompletedEvent,
ResponseContentPartAddedEvent,
ResponseContentPartDoneEvent,
ResponseCreatedEvent,
ResponseFileSearchToolCallParam,
ResponseFunctionCallArgumentsDeltaEvent,
ResponseFunctionToolCall,
ResponseFunctionToolCallParam,
ResponseInputContentParam,
ResponseInputImageParam,
ResponseInputTextParam,
ResponseOutputItem,
ResponseOutputItemAddedEvent,
ResponseOutputItemDoneEvent,
ResponseOutputMessage,
ResponseOutputMessageParam,
ResponseOutputRefusal,
ResponseOutputText,
ResponseRefusalDeltaEvent,
ResponseTextDeltaEvent,
ResponseUsage,
)
from openai.types.responses.response_input_param import FunctionCallOutput, ItemReference, Message
from openai.types.responses.response_usage import InputTokensDetails, OutputTokensDetails
from .. import _debug
from ..agent_output import AgentOutputSchema
from ..exceptions import AgentsException, UserError
from ..handoffs import Handoff
from ..items import ModelResponse, TResponseInputItem, TResponseOutputItem, TResponseStreamEvent
from ..items import ModelResponse, TResponseInputItem, TResponseStreamEvent
from ..logger import logger
from ..tool import FunctionTool, Tool
from ..tool import Tool
from ..tracing import generation_span
from ..tracing.span_data import GenerationSpanData
from ..tracing.spans import Span
from ..usage import Usage
from ..version import __version__
from .chatcmpl_converter import Converter
from .fake_id import FAKE_RESPONSES_ID
from .interface import Model, ModelTracing
@ -152,7 +127,7 @@ class OpenAIChatCompletionsModel(Model):
"output_tokens": usage.output_tokens,
}
items = _Converter.message_to_output_items(response.choices[0].message)
items = Converter.message_to_output_items(response.choices[0].message)
return ModelResponse(
output=items,
@ -486,7 +461,7 @@ class OpenAIChatCompletionsModel(Model):
tracing: ModelTracing,
stream: bool = False,
) -> ChatCompletion | tuple[Response, AsyncStream[ChatCompletionChunk]]:
converted_messages = _Converter.items_to_messages(input)
converted_messages = Converter.items_to_messages(input)
if system_instructions:
converted_messages.insert(
@ -506,13 +481,13 @@ class OpenAIChatCompletionsModel(Model):
if model_settings.parallel_tool_calls is False
else NOT_GIVEN
)
tool_choice = _Converter.convert_tool_choice(model_settings.tool_choice)
response_format = _Converter.convert_response_format(output_schema)
tool_choice = Converter.convert_tool_choice(model_settings.tool_choice)
response_format = Converter.convert_response_format(output_schema)
converted_tools = [ToolConverter.to_openai(tool) for tool in tools] if tools else []
converted_tools = [Converter.tool_to_openai(tool) for tool in tools] if tools else []
for handoff in handoffs:
converted_tools.append(ToolConverter.convert_handoff_tool(handoff))
converted_tools.append(Converter.convert_handoff_tool(handoff))
if _debug.DONT_LOG_MODEL_DATA:
logger.debug("Calling LLM")
@ -526,9 +501,9 @@ class OpenAIChatCompletionsModel(Model):
)
reasoning_effort = model_settings.reasoning.effort if model_settings.reasoning else None
store = _Converter.get_store_param(self._get_client(), model_settings)
store = _Helpers.get_store_param(self._get_client(), model_settings)
stream_options = _Converter.get_stream_options_param(
stream_options = _Helpers.get_stream_options_param(
self._get_client(), model_settings, stream=stream
)
@ -580,7 +555,7 @@ class OpenAIChatCompletionsModel(Model):
return self._client
class _Converter:
class _Helpers:
@classmethod
def is_openai(cls, client: AsyncOpenAI):
return str(client.base_url).startswith("https://api.openai.com")
@ -606,425 +581,3 @@ class _Converter:
)
stream_options = {"include_usage": include_usage} if include_usage is not None else None
return stream_options
@classmethod
def convert_tool_choice(
cls, tool_choice: Literal["auto", "required", "none"] | str | None
) -> ChatCompletionToolChoiceOptionParam | NotGiven:
if tool_choice is None:
return NOT_GIVEN
elif tool_choice == "auto":
return "auto"
elif tool_choice == "required":
return "required"
elif tool_choice == "none":
return "none"
else:
return {
"type": "function",
"function": {
"name": tool_choice,
},
}
@classmethod
def convert_response_format(
cls, final_output_schema: AgentOutputSchema | None
) -> ResponseFormat | NotGiven:
if not final_output_schema or final_output_schema.is_plain_text():
return NOT_GIVEN
return {
"type": "json_schema",
"json_schema": {
"name": "final_output",
"strict": final_output_schema.strict_json_schema,
"schema": final_output_schema.json_schema(),
},
}
@classmethod
def message_to_output_items(cls, message: ChatCompletionMessage) -> list[TResponseOutputItem]:
items: list[TResponseOutputItem] = []
message_item = ResponseOutputMessage(
id=FAKE_RESPONSES_ID,
content=[],
role="assistant",
type="message",
status="completed",
)
if message.content:
message_item.content.append(
ResponseOutputText(text=message.content, type="output_text", annotations=[])
)
if message.refusal:
message_item.content.append(
ResponseOutputRefusal(refusal=message.refusal, type="refusal")
)
if message.audio:
raise AgentsException("Audio is not currently supported")
if message_item.content:
items.append(message_item)
if message.tool_calls:
for tool_call in message.tool_calls:
items.append(
ResponseFunctionToolCall(
id=FAKE_RESPONSES_ID,
call_id=tool_call.id,
arguments=tool_call.function.arguments,
name=tool_call.function.name,
type="function_call",
)
)
return items
@classmethod
def maybe_easy_input_message(cls, item: Any) -> EasyInputMessageParam | None:
if not isinstance(item, dict):
return None
keys = item.keys()
# EasyInputMessageParam only has these two keys
if keys != {"content", "role"}:
return None
role = item.get("role", None)
if role not in ("user", "assistant", "system", "developer"):
return None
if "content" not in item:
return None
return cast(EasyInputMessageParam, item)
@classmethod
def maybe_input_message(cls, item: Any) -> Message | None:
if (
isinstance(item, dict)
and item.get("type") == "message"
and item.get("role")
in (
"user",
"system",
"developer",
)
):
return cast(Message, item)
return None
@classmethod
def maybe_file_search_call(cls, item: Any) -> ResponseFileSearchToolCallParam | None:
if isinstance(item, dict) and item.get("type") == "file_search_call":
return cast(ResponseFileSearchToolCallParam, item)
return None
@classmethod
def maybe_function_tool_call(cls, item: Any) -> ResponseFunctionToolCallParam | None:
if isinstance(item, dict) and item.get("type") == "function_call":
return cast(ResponseFunctionToolCallParam, item)
return None
@classmethod
def maybe_function_tool_call_output(
cls,
item: Any,
) -> FunctionCallOutput | None:
if isinstance(item, dict) and item.get("type") == "function_call_output":
return cast(FunctionCallOutput, item)
return None
@classmethod
def maybe_item_reference(cls, item: Any) -> ItemReference | None:
if isinstance(item, dict) and item.get("type") == "item_reference":
return cast(ItemReference, item)
return None
@classmethod
def maybe_response_output_message(cls, item: Any) -> ResponseOutputMessageParam | None:
# ResponseOutputMessage is only used for messages with role assistant
if (
isinstance(item, dict)
and item.get("type") == "message"
and item.get("role") == "assistant"
):
return cast(ResponseOutputMessageParam, item)
return None
@classmethod
def extract_text_content(
cls, content: str | Iterable[ResponseInputContentParam]
) -> str | list[ChatCompletionContentPartTextParam]:
all_content = cls.extract_all_content(content)
if isinstance(all_content, str):
return all_content
out: list[ChatCompletionContentPartTextParam] = []
for c in all_content:
if c.get("type") == "text":
out.append(cast(ChatCompletionContentPartTextParam, c))
return out
@classmethod
def extract_all_content(
cls, content: str | Iterable[ResponseInputContentParam]
) -> str | list[ChatCompletionContentPartParam]:
if isinstance(content, str):
return content
out: list[ChatCompletionContentPartParam] = []
for c in content:
if isinstance(c, dict) and c.get("type") == "input_text":
casted_text_param = cast(ResponseInputTextParam, c)
out.append(
ChatCompletionContentPartTextParam(
type="text",
text=casted_text_param["text"],
)
)
elif isinstance(c, dict) and c.get("type") == "input_image":
casted_image_param = cast(ResponseInputImageParam, c)
if "image_url" not in casted_image_param or not casted_image_param["image_url"]:
raise UserError(
f"Only image URLs are supported for input_image {casted_image_param}"
)
out.append(
ChatCompletionContentPartImageParam(
type="image_url",
image_url={
"url": casted_image_param["image_url"],
"detail": casted_image_param["detail"],
},
)
)
elif isinstance(c, dict) and c.get("type") == "input_file":
raise UserError(f"File uploads are not supported for chat completions {c}")
else:
raise UserError(f"Unknown content: {c}")
return out
@classmethod
def items_to_messages(
cls,
items: str | Iterable[TResponseInputItem],
) -> list[ChatCompletionMessageParam]:
"""
Convert a sequence of 'Item' objects into a list of ChatCompletionMessageParam.
Rules:
- EasyInputMessage or InputMessage (role=user) => ChatCompletionUserMessageParam
- EasyInputMessage or InputMessage (role=system) => ChatCompletionSystemMessageParam
- EasyInputMessage or InputMessage (role=developer) => ChatCompletionDeveloperMessageParam
- InputMessage (role=assistant) => Start or flush a ChatCompletionAssistantMessageParam
- response_output_message => Also produces/flushes a ChatCompletionAssistantMessageParam
- tool calls get attached to the *current* assistant message, or create one if none.
- tool outputs => ChatCompletionToolMessageParam
"""
if isinstance(items, str):
return [
ChatCompletionUserMessageParam(
role="user",
content=items,
)
]
result: list[ChatCompletionMessageParam] = []
current_assistant_msg: ChatCompletionAssistantMessageParam | None = None
def flush_assistant_message() -> None:
nonlocal current_assistant_msg
if current_assistant_msg is not None:
# The API doesn't support empty arrays for tool_calls
if not current_assistant_msg.get("tool_calls"):
del current_assistant_msg["tool_calls"]
result.append(current_assistant_msg)
current_assistant_msg = None
def ensure_assistant_message() -> ChatCompletionAssistantMessageParam:
nonlocal current_assistant_msg
if current_assistant_msg is None:
current_assistant_msg = ChatCompletionAssistantMessageParam(role="assistant")
current_assistant_msg["tool_calls"] = []
return current_assistant_msg
for item in items:
# 1) Check easy input message
if easy_msg := cls.maybe_easy_input_message(item):
role = easy_msg["role"]
content = easy_msg["content"]
if role == "user":
flush_assistant_message()
msg_user: ChatCompletionUserMessageParam = {
"role": "user",
"content": cls.extract_all_content(content),
}
result.append(msg_user)
elif role == "system":
flush_assistant_message()
msg_system: ChatCompletionSystemMessageParam = {
"role": "system",
"content": cls.extract_text_content(content),
}
result.append(msg_system)
elif role == "developer":
flush_assistant_message()
msg_developer: ChatCompletionDeveloperMessageParam = {
"role": "developer",
"content": cls.extract_text_content(content),
}
result.append(msg_developer)
elif role == "assistant":
flush_assistant_message()
msg_assistant: ChatCompletionAssistantMessageParam = {
"role": "assistant",
"content": cls.extract_text_content(content),
}
result.append(msg_assistant)
else:
raise UserError(f"Unexpected role in easy_input_message: {role}")
# 2) Check input message
elif in_msg := cls.maybe_input_message(item):
role = in_msg["role"]
content = in_msg["content"]
flush_assistant_message()
if role == "user":
msg_user = {
"role": "user",
"content": cls.extract_all_content(content),
}
result.append(msg_user)
elif role == "system":
msg_system = {
"role": "system",
"content": cls.extract_text_content(content),
}
result.append(msg_system)
elif role == "developer":
msg_developer = {
"role": "developer",
"content": cls.extract_text_content(content),
}
result.append(msg_developer)
else:
raise UserError(f"Unexpected role in input_message: {role}")
# 3) response output message => assistant
elif resp_msg := cls.maybe_response_output_message(item):
flush_assistant_message()
new_asst = ChatCompletionAssistantMessageParam(role="assistant")
contents = resp_msg["content"]
text_segments = []
for c in contents:
if c["type"] == "output_text":
text_segments.append(c["text"])
elif c["type"] == "refusal":
new_asst["refusal"] = c["refusal"]
elif c["type"] == "output_audio":
# Can't handle this, b/c chat completions expects an ID which we dont have
raise UserError(
f"Only audio IDs are supported for chat completions, but got: {c}"
)
else:
raise UserError(f"Unknown content type in ResponseOutputMessage: {c}")
if text_segments:
combined = "\n".join(text_segments)
new_asst["content"] = combined
new_asst["tool_calls"] = []
current_assistant_msg = new_asst
# 4) function/file-search calls => attach to assistant
elif file_search := cls.maybe_file_search_call(item):
asst = ensure_assistant_message()
tool_calls = list(asst.get("tool_calls", []))
new_tool_call = ChatCompletionMessageToolCallParam(
id=file_search["id"],
type="function",
function={
"name": "file_search_call",
"arguments": json.dumps(
{
"queries": file_search.get("queries", []),
"status": file_search.get("status"),
}
),
},
)
tool_calls.append(new_tool_call)
asst["tool_calls"] = tool_calls
elif func_call := cls.maybe_function_tool_call(item):
asst = ensure_assistant_message()
tool_calls = list(asst.get("tool_calls", []))
arguments = func_call["arguments"] if func_call["arguments"] else "{}"
new_tool_call = ChatCompletionMessageToolCallParam(
id=func_call["call_id"],
type="function",
function={
"name": func_call["name"],
"arguments": arguments,
},
)
tool_calls.append(new_tool_call)
asst["tool_calls"] = tool_calls
# 5) function call output => tool message
elif func_output := cls.maybe_function_tool_call_output(item):
flush_assistant_message()
msg: ChatCompletionToolMessageParam = {
"role": "tool",
"tool_call_id": func_output["call_id"],
"content": func_output["output"],
}
result.append(msg)
# 6) item reference => handle or raise
elif item_ref := cls.maybe_item_reference(item):
raise UserError(
f"Encountered an item_reference, which is not supported: {item_ref}"
)
# 7) If we haven't recognized it => fail or ignore
else:
raise UserError(f"Unhandled item type or structure: {item}")
flush_assistant_message()
return result
class ToolConverter:
@classmethod
def to_openai(cls, tool: Tool) -> ChatCompletionToolParam:
if isinstance(tool, FunctionTool):
return {
"type": "function",
"function": {
"name": tool.name,
"description": tool.description or "",
"parameters": tool.params_json_schema,
},
}
raise UserError(
f"Hosted tools are not supported with the ChatCompletions API. Got tool type: "
f"{type(tool)}, tool: {tool}"
)
@classmethod
def convert_handoff_tool(cls, handoff: Handoff[Any]) -> ChatCompletionToolParam:
return {
"type": "function",
"function": {
"name": handoff.tool_name,
"description": handoff.tool_description,
"parameters": handoff.input_json_schema,
},
}

View file

@ -31,7 +31,7 @@ from agents import (
generation_span,
)
from agents.models.fake_id import FAKE_RESPONSES_ID
from agents.models.openai_chatcompletions import _Converter
from agents.models.openai_chatcompletions import _Helpers
@pytest.mark.allow_call_model_methods
@ -301,32 +301,32 @@ def test_store_param():
model_settings = ModelSettings()
client = AsyncOpenAI()
assert _Converter.get_store_param(client, model_settings) is True, (
assert _Helpers.get_store_param(client, model_settings) is True, (
"Should default to True for OpenAI API calls"
)
model_settings = ModelSettings(store=False)
assert _Converter.get_store_param(client, model_settings) is False, (
assert _Helpers.get_store_param(client, model_settings) is False, (
"Should respect explicitly set store=False"
)
model_settings = ModelSettings(store=True)
assert _Converter.get_store_param(client, model_settings) is True, (
assert _Helpers.get_store_param(client, model_settings) is True, (
"Should respect explicitly set store=True"
)
client = AsyncOpenAI(base_url="http://www.notopenai.com")
model_settings = ModelSettings()
assert _Converter.get_store_param(client, model_settings) is None, (
assert _Helpers.get_store_param(client, model_settings) is None, (
"Should default to None for non-OpenAI API calls"
)
model_settings = ModelSettings(store=False)
assert _Converter.get_store_param(client, model_settings) is False, (
assert _Helpers.get_store_param(client, model_settings) is False, (
"Should respect explicitly set store=False"
)
model_settings = ModelSettings(store=True)
assert _Converter.get_store_param(client, model_settings) is True, (
assert _Helpers.get_store_param(client, model_settings) is True, (
"Should respect explicitly set store=True"
)

View file

@ -4,7 +4,7 @@
# See LICENSE file in the project root for full license information.
"""
Unit tests for the internal `_Converter` class defined in
Unit tests for the internal `Converter` class defined in
`agents.models.openai_chatcompletions`. The converter is responsible for
translating between internal "item" structures (e.g., `ResponseOutputMessage`
and related types from `openai.types.responses`) and the ChatCompletion message
@ -12,10 +12,10 @@ structures defined by the OpenAI client library.
These tests exercise both conversion directions:
- `_Converter.message_to_output_items` turns a `ChatCompletionMessage` (as
- `Converter.message_to_output_items` turns a `ChatCompletionMessage` (as
returned by the OpenAI API) into a list of `ResponseOutputItem` instances.
- `_Converter.items_to_messages` takes in either a simple string prompt, or a
- `Converter.items_to_messages` takes in either a simple string prompt, or a
list of input/output items such as `ResponseOutputMessage` and
`ResponseFunctionToolCallParam` dicts, and constructs a list of
`ChatCompletionMessageParam` dicts suitable for sending back to the API.
@ -41,8 +41,8 @@ from openai.types.responses.response_input_item_param import FunctionCallOutput
from agents.agent_output import AgentOutputSchema
from agents.exceptions import UserError
from agents.items import TResponseInputItem
from agents.models.chatcmpl_converter import Converter
from agents.models.fake_id import FAKE_RESPONSES_ID
from agents.models.openai_chatcompletions import _Converter
def test_message_to_output_items_with_text_only():
@ -51,7 +51,7 @@ def test_message_to_output_items_with_text_only():
into a single ResponseOutputMessage containing one ResponseOutputText.
"""
msg = ChatCompletionMessage(role="assistant", content="Hello")
items = _Converter.message_to_output_items(msg)
items = Converter.message_to_output_items(msg)
# Expect exactly one output item (the message)
assert len(items) == 1
message_item = cast(ResponseOutputMessage, items[0])
@ -72,7 +72,7 @@ def test_message_to_output_items_with_refusal():
with a ResponseOutputRefusal content part.
"""
msg = ChatCompletionMessage(role="assistant", refusal="I'm sorry")
items = _Converter.message_to_output_items(msg)
items = Converter.message_to_output_items(msg)
assert len(items) == 1
message_item = cast(ResponseOutputMessage, items[0])
assert len(message_item.content) == 1
@ -93,7 +93,7 @@ def test_message_to_output_items_with_tool_call():
function=Function(name="myfn", arguments='{"x":1}'),
)
msg = ChatCompletionMessage(role="assistant", content="Hi", tool_calls=[tool_call])
items = _Converter.message_to_output_items(msg)
items = Converter.message_to_output_items(msg)
# Should produce a message item followed by one function tool call item
assert len(items) == 2
message_item = cast(ResponseOutputMessage, items[0])
@ -111,7 +111,7 @@ def test_items_to_messages_with_string_user_content():
A simple string as the items argument should be converted into a user
message param dict with the same content.
"""
result = _Converter.items_to_messages("Ask me anything")
result = Converter.items_to_messages("Ask me anything")
assert isinstance(result, list)
assert len(result) == 1
msg = result[0]
@ -130,7 +130,7 @@ def test_items_to_messages_with_easy_input_message():
"content": "How are you?",
}
]
messages = _Converter.items_to_messages(items)
messages = Converter.items_to_messages(items)
assert len(messages) == 1
out = messages[0]
assert out["role"] == "user"
@ -174,7 +174,7 @@ def test_items_to_messages_with_output_message_and_function_call():
resp_msg.model_dump(), # type:ignore
func_item,
]
messages = _Converter.items_to_messages(items)
messages = Converter.items_to_messages(items)
# Should return a single assistant message
assert len(messages) == 1
assistant = messages[0]
@ -197,16 +197,16 @@ def test_items_to_messages_with_output_message_and_function_call():
def test_convert_tool_choice_handles_standard_and_named_options() -> None:
"""
The `_Converter.convert_tool_choice` method should return NOT_GIVEN
The `Converter.convert_tool_choice` method should return NOT_GIVEN
if no choice is provided, pass through values like "auto", "required",
or "none" unchanged, and translate any other string into a function
selection dict.
"""
assert _Converter.convert_tool_choice(None).__class__.__name__ == "NotGiven"
assert _Converter.convert_tool_choice("auto") == "auto"
assert _Converter.convert_tool_choice("required") == "required"
assert _Converter.convert_tool_choice("none") == "none"
tool_choice_dict = _Converter.convert_tool_choice("mytool")
assert Converter.convert_tool_choice(None).__class__.__name__ == "NotGiven"
assert Converter.convert_tool_choice("auto") == "auto"
assert Converter.convert_tool_choice("required") == "required"
assert Converter.convert_tool_choice("none") == "none"
tool_choice_dict = Converter.convert_tool_choice("mytool")
assert isinstance(tool_choice_dict, dict)
assert tool_choice_dict["type"] == "function"
assert tool_choice_dict["function"]["name"] == "mytool"
@ -214,20 +214,20 @@ def test_convert_tool_choice_handles_standard_and_named_options() -> None:
def test_convert_response_format_returns_not_given_for_plain_text_and_dict_for_schemas() -> None:
"""
The `_Converter.convert_response_format` method should return NOT_GIVEN
The `Converter.convert_response_format` method should return NOT_GIVEN
when no output schema is provided or if the output schema indicates
plain text. For structured output schemas, it should return a dict
with type `json_schema` and include the generated JSON schema and
strict flag from the provided `AgentOutputSchema`.
"""
# when output is plain text (schema None or output_type str), do not include response_format
assert _Converter.convert_response_format(None).__class__.__name__ == "NotGiven"
assert Converter.convert_response_format(None).__class__.__name__ == "NotGiven"
assert (
_Converter.convert_response_format(AgentOutputSchema(str)).__class__.__name__ == "NotGiven"
Converter.convert_response_format(AgentOutputSchema(str)).__class__.__name__ == "NotGiven"
)
# For e.g. integer output, we expect a response_format dict
schema = AgentOutputSchema(int)
resp_format = _Converter.convert_response_format(schema)
resp_format = Converter.convert_response_format(schema)
assert isinstance(resp_format, dict)
assert resp_format["type"] == "json_schema"
assert resp_format["json_schema"]["name"] == "final_output"
@ -247,7 +247,7 @@ def test_items_to_messages_with_function_output_item():
"call_id": "somecall",
"output": '{"foo": "bar"}',
}
messages = _Converter.items_to_messages([func_output_item])
messages = Converter.items_to_messages([func_output_item])
assert len(messages) == 1
tool_msg = messages[0]
assert tool_msg["role"] == "tool"
@ -266,16 +266,16 @@ def test_extract_all_and_text_content_for_strings_and_lists():
should filter to only the textual parts.
"""
prompt = "just text"
assert _Converter.extract_all_content(prompt) == prompt
assert _Converter.extract_text_content(prompt) == prompt
assert Converter.extract_all_content(prompt) == prompt
assert Converter.extract_text_content(prompt) == prompt
text1: ResponseInputTextParam = {"type": "input_text", "text": "one"}
text2: ResponseInputTextParam = {"type": "input_text", "text": "two"}
all_parts = _Converter.extract_all_content([text1, text2])
all_parts = Converter.extract_all_content([text1, text2])
assert isinstance(all_parts, list)
assert len(all_parts) == 2
assert all_parts[0]["type"] == "text" and all_parts[0]["text"] == "one"
assert all_parts[1]["type"] == "text" and all_parts[1]["text"] == "two"
text_parts = _Converter.extract_text_content([text1, text2])
text_parts = Converter.extract_text_content([text1, text2])
assert isinstance(text_parts, list)
assert all(p["type"] == "text" for p in text_parts)
assert [p["text"] for p in text_parts] == ["one", "two"]
@ -288,12 +288,12 @@ def test_items_to_messages_handles_system_and_developer_roles():
`message` typed dicts.
"""
sys_items: list[TResponseInputItem] = [{"role": "system", "content": "setup"}]
sys_msgs = _Converter.items_to_messages(sys_items)
sys_msgs = Converter.items_to_messages(sys_items)
assert len(sys_msgs) == 1
assert sys_msgs[0]["role"] == "system"
assert sys_msgs[0]["content"] == "setup"
dev_items: list[TResponseInputItem] = [{"role": "developer", "content": "debug"}]
dev_msgs = _Converter.items_to_messages(dev_items)
dev_msgs = Converter.items_to_messages(dev_items)
assert len(dev_msgs) == 1
assert dev_msgs[0]["role"] == "developer"
assert dev_msgs[0]["content"] == "debug"
@ -301,7 +301,7 @@ def test_items_to_messages_handles_system_and_developer_roles():
def test_maybe_input_message_allows_message_typed_dict():
"""
The `_Converter.maybe_input_message` should recognize a dict with
The `Converter.maybe_input_message` should recognize a dict with
"type": "message" and a supported role as an input message. Ensure
that such dicts are passed through by `items_to_messages`.
"""
@ -311,9 +311,9 @@ def test_maybe_input_message_allows_message_typed_dict():
"role": "user",
"content": "hi",
}
assert _Converter.maybe_input_message(message_dict) is not None
assert Converter.maybe_input_message(message_dict) is not None
# items_to_messages should process this correctly
msgs = _Converter.items_to_messages([message_dict])
msgs = Converter.items_to_messages([message_dict])
assert len(msgs) == 1
assert msgs[0]["role"] == "user"
assert msgs[0]["content"] == "hi"
@ -331,7 +331,7 @@ def test_tool_call_conversion():
type="function_call",
)
messages = _Converter.items_to_messages([function_call])
messages = Converter.items_to_messages([function_call])
assert len(messages) == 1
tool_msg = messages[0]
assert tool_msg["role"] == "assistant"
@ -348,7 +348,7 @@ def test_tool_call_conversion():
@pytest.mark.parametrize("role", ["user", "system", "developer"])
def test_input_message_with_all_roles(role: str):
"""
The `_Converter.maybe_input_message` should recognize a dict with
The `Converter.maybe_input_message` should recognize a dict with
"type": "message" and a supported role as an input message. Ensure
that such dicts are passed through by `items_to_messages`.
"""
@ -359,9 +359,9 @@ def test_input_message_with_all_roles(role: str):
"role": casted_role,
"content": "hi",
}
assert _Converter.maybe_input_message(message_dict) is not None
assert Converter.maybe_input_message(message_dict) is not None
# items_to_messages should process this correctly
msgs = _Converter.items_to_messages([message_dict])
msgs = Converter.items_to_messages([message_dict])
assert len(msgs) == 1
assert msgs[0]["role"] == casted_role
assert msgs[0]["content"] == "hi"
@ -372,7 +372,7 @@ def test_item_reference_errors():
Test that item references are converted correctly.
"""
with pytest.raises(UserError):
_Converter.items_to_messages(
Converter.items_to_messages(
[
{
"type": "item_reference",
@ -392,14 +392,14 @@ def test_unknown_object_errors():
"""
with pytest.raises(UserError, match="Unhandled item type or structure"):
# Purposely ignore the type error
_Converter.items_to_messages([TestObject()]) # type: ignore
Converter.items_to_messages([TestObject()]) # type: ignore
def test_assistant_messages_in_history():
"""
Test that assistant messages are added to the history.
"""
messages = _Converter.items_to_messages(
messages = Converter.items_to_messages(
[
{
"role": "user",

View file

@ -3,7 +3,7 @@ from pydantic import BaseModel
from agents import Agent, Handoff, function_tool, handoff
from agents.exceptions import UserError
from agents.models.openai_chatcompletions import ToolConverter
from agents.models.chatcmpl_converter import Converter
from agents.tool import FileSearchTool, WebSearchTool
@ -15,7 +15,7 @@ def test_to_openai_with_function_tool():
some_function(a="foo", b=[1, 2, 3])
tool = function_tool(some_function)
result = ToolConverter.to_openai(tool)
result = Converter.tool_to_openai(tool)
assert result["type"] == "function"
assert result["function"]["name"] == "some_function"
@ -34,7 +34,7 @@ class Foo(BaseModel):
def test_convert_handoff_tool():
agent = Agent(name="test_1", handoff_description="test_2")
handoff_obj = handoff(agent=agent)
result = ToolConverter.convert_handoff_tool(handoff_obj)
result = Converter.convert_handoff_tool(handoff_obj)
assert result["type"] == "function"
assert result["function"]["name"] == Handoff.default_tool_name(agent)
@ -48,7 +48,7 @@ def test_convert_handoff_tool():
def test_tool_converter_hosted_tools_errors():
with pytest.raises(UserError):
ToolConverter.to_openai(WebSearchTool())
Converter.tool_to_openai(WebSearchTool())
with pytest.raises(UserError):
ToolConverter.to_openai(FileSearchTool(vector_store_ids=["abc"], max_num_results=1))
Converter.tool_to_openai(FileSearchTool(vector_store_ids=["abc"], max_num_results=1))