[5/n] MCP tracing

## Summary:

Adds tracing and tests for tracing.
- Tools are added to the agents
- Theres a span for the mcp tools lookup
- Functions have MCP data

## Test Plan:

Unit tests
.
This commit is contained in:
Rohan Mehta 2025-03-25 18:01:00 -04:00
parent ad020b73b5
commit 010022777b
13 changed files with 352 additions and 22 deletions

View file

@ -2,7 +2,7 @@ import asyncio
import os
import shutil
from agents import Agent, Runner, trace
from agents import Agent, Runner, gen_trace_id, trace
from agents.mcp import MCPServer, MCPServerStdio
@ -37,12 +37,15 @@ async def main():
samples_dir = os.path.join(current_dir, "sample_files")
async with MCPServerStdio(
name="Filesystem Server, via npx",
params={
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", samples_dir],
}
},
) as server:
with trace(workflow_name="MCP Filesystem Example"):
trace_id = gen_trace_id()
with trace(workflow_name="MCP Filesystem Example", trace_id=trace_id):
print(f"View trace: https://platform.openai.com/traces/{trace_id}\n")
await run(server)

View file

@ -70,6 +70,7 @@ from .tracing import (
GenerationSpanData,
GuardrailSpanData,
HandoffSpanData,
MCPListToolsSpanData,
Span,
SpanData,
SpanError,
@ -89,6 +90,7 @@ from .tracing import (
get_current_trace,
guardrail_span,
handoff_span,
mcp_tools_span,
set_trace_processors,
set_tracing_disabled,
set_tracing_export_api_key,
@ -220,6 +222,7 @@ __all__ = [
"speech_group_span",
"transcription_span",
"speech_span",
"mcp_tools_span",
"trace",
"Trace",
"TracingProcessor",
@ -234,6 +237,7 @@ __all__ = [
"HandoffSpanData",
"SpeechGroupSpanData",
"SpeechSpanData",
"MCPListToolsSpanData",
"TranscriptionSpanData",
"set_default_openai_key",
"set_default_openai_client",

View file

@ -228,4 +228,5 @@ class Agent(Generic[TContext]):
async def get_all_tools(self) -> list[Tool]:
"""All agent tools, including MCP tools and function tools."""
return await MCPUtil.get_all_function_tools(self.mcp_servers) + self.tools
mcp_tools = await self.get_mcp_tools()
return mcp_tools + self.tools

View file

@ -27,6 +27,12 @@ class MCPServer(abc.ABC):
"""
pass
@property
@abc.abstractmethod
def name(self) -> str:
"""A readable name for the server."""
pass
@abc.abstractmethod
async def cleanup(self):
"""Cleanup the server. For example, this might mean closing a subprocess or
@ -171,7 +177,12 @@ class MCPServerStdio(_MCPServerWithClientSession):
details.
"""
def __init__(self, params: MCPServerStdioParams, cache_tools_list: bool = False):
def __init__(
self,
params: MCPServerStdioParams,
cache_tools_list: bool = False,
name: str | None = None,
):
"""Create a new MCP server based on the stdio transport.
Args:
@ -185,6 +196,8 @@ class MCPServerStdio(_MCPServerWithClientSession):
invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
if you know the server will not change its tools list, because it can drastically
improve latency (by avoiding a round-trip to the server every time).
name: A readable name for the server. If not provided, we'll create one from the
command.
"""
super().__init__(cache_tools_list)
@ -197,6 +210,8 @@ class MCPServerStdio(_MCPServerWithClientSession):
encoding_error_handler=params.get("encoding_error_handler", "strict"),
)
self._name = name or f"stdio: {self.params.command}"
def create_streams(
self,
) -> AbstractAsyncContextManager[
@ -208,6 +223,11 @@ class MCPServerStdio(_MCPServerWithClientSession):
"""Create the streams for the server."""
return stdio_client(self.params)
@property
def name(self) -> str:
"""A readable name for the server."""
return self._name
class MCPServerSseParams(TypedDict):
"""Mirrors the params in`mcp.client.sse.sse_client`."""
@ -231,7 +251,12 @@ class MCPServerSse(_MCPServerWithClientSession):
for details.
"""
def __init__(self, params: MCPServerSseParams, cache_tools_list: bool = False):
def __init__(
self,
params: MCPServerSseParams,
cache_tools_list: bool = False,
name: str | None = None,
):
"""Create a new MCP server based on the HTTP with SSE transport.
Args:
@ -245,10 +270,14 @@ class MCPServerSse(_MCPServerWithClientSession):
invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
if you know the server will not change its tools list, because it can drastically
improve latency (by avoiding a round-trip to the server every time).
name: A readable name for the server. If not provided, we'll create one from the
URL.
"""
super().__init__(cache_tools_list)
self.params = params
self._name = name or f"sse: {self.params['url']}"
def create_streams(
self,
@ -265,3 +294,8 @@ class MCPServerSse(_MCPServerWithClientSession):
timeout=self.params.get("timeout", 5),
sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5),
)
@property
def name(self) -> str:
"""A readable name for the server."""
return self._name

View file

@ -7,6 +7,7 @@ from ..exceptions import AgentsException, ModelBehaviorError, UserError
from ..logger import logger
from ..run_context import RunContextWrapper
from ..tool import FunctionTool, Tool
from ..tracing import FunctionSpanData, get_current_span, mcp_tools_span
if TYPE_CHECKING:
from mcp.types import Tool as MCPTool
@ -38,7 +39,11 @@ class MCPUtil:
@classmethod
async def get_function_tools(cls, server: "MCPServer") -> list[Tool]:
"""Get all function tools from a single MCP server."""
tools = await server.list_tools()
with mcp_tools_span(server=server.name) as span:
tools = await server.list_tools()
span.span_data.result = [tool.name for tool in tools]
return [cls.to_function_tool(tool, server) for tool in tools]
@classmethod
@ -88,9 +93,23 @@ class MCPUtil:
# The MCP tool result is a list of content items, whereas OpenAI tool outputs are a single
# string. We'll try to convert.
if len(result.content) == 1:
return result.content[0].model_dump_json()
tool_output = result.content[0].model_dump_json()
elif len(result.content) > 1:
return json.dumps([item.model_dump() for item in result.content])
tool_output = json.dumps([item.model_dump() for item in result.content])
else:
logger.error(f"Errored MCP tool result: {result}")
return "Error running tool."
tool_output = "Error running tool."
current_span = get_current_span()
if current_span:
if isinstance(current_span.span_data, FunctionSpanData):
current_span.span_data.output = tool_output
current_span.span_data.mcp_data = {
"server": server.name,
}
else:
logger.warning(
f"Current span is not a FunctionSpanData, skipping tool output: {current_span}"
)
return tool_output

View file

@ -7,8 +7,6 @@ from typing import Any, cast
from openai.types.responses import ResponseCompletedEvent
from agents.tool import Tool
from ._run_impl import (
AgentToolUseTracker,
NextStepFinalOutput,
@ -40,6 +38,7 @@ from .models.openai_provider import OpenAIProvider
from .result import RunResult, RunResultStreaming
from .run_context import RunContextWrapper, TContext
from .stream_events import AgentUpdatedStreamEvent, RawResponsesStreamEvent
from .tool import Tool
from .tracing import Span, SpanError, agent_span, get_current_trace, trace
from .tracing.span_data import AgentSpanData
from .usage import Usage
@ -182,8 +181,6 @@ class Runner:
# agent changes, or if the agent loop ends.
if current_span is None:
handoff_names = [h.agent_name for h in cls._get_handoffs(current_agent)]
all_tools = await cls._get_all_tools(current_agent)
tool_names = [t.name for t in all_tools]
if output_schema := cls._get_output_schema(current_agent):
output_type_name = output_schema.output_type_name()
else:
@ -192,11 +189,13 @@ class Runner:
current_span = agent_span(
name=current_agent.name,
handoffs=handoff_names,
tools=tool_names,
output_type=output_type_name,
)
current_span.start(mark_as_current=True)
all_tools = await cls._get_all_tools(current_agent)
current_span.span_data.tools = [t.name for t in all_tools]
current_turn += 1
if current_turn > max_turns:
_error_tracing.attach_error_to_span(
@ -504,7 +503,6 @@ class Runner:
# agent changes, or if the agent loop ends.
if current_span is None:
handoff_names = [h.agent_name for h in cls._get_handoffs(current_agent)]
tool_names = [t.name for t in current_agent.tools]
if output_schema := cls._get_output_schema(current_agent):
output_type_name = output_schema.output_type_name()
else:
@ -513,11 +511,13 @@ class Runner:
current_span = agent_span(
name=current_agent.name,
handoffs=handoff_names,
tools=tool_names,
output_type=output_type_name,
)
current_span.start(mark_as_current=True)
all_tools = await cls._get_all_tools(current_agent)
tool_names = [t.name for t in all_tools]
current_span.span_data.tools = tool_names
current_turn += 1
streamed_result.current_turn = current_turn
@ -553,6 +553,7 @@ class Runner:
run_config,
should_run_agent_start_hooks,
tool_use_tracker,
all_tools,
)
should_run_agent_start_hooks = False
@ -621,6 +622,7 @@ class Runner:
run_config: RunConfig,
should_run_agent_start_hooks: bool,
tool_use_tracker: AgentToolUseTracker,
all_tools: list[Tool],
) -> SingleStepResult:
if should_run_agent_start_hooks:
await asyncio.gather(
@ -640,7 +642,6 @@ class Runner:
system_prompt = await agent.get_system_prompt(context_wrapper)
handoffs = cls._get_handoffs(agent)
all_tools = await cls._get_all_tools(agent)
model = cls._get_model(agent, run_config)
model_settings = agent.model_settings.resolve(run_config.model_settings)
model_settings = RunImpl.maybe_reset_tool_choice(agent, tool_use_tracker, model_settings)

View file

@ -9,6 +9,7 @@ from .create import (
get_current_trace,
guardrail_span,
handoff_span,
mcp_tools_span,
response_span,
speech_group_span,
speech_span,
@ -25,6 +26,7 @@ from .span_data import (
GenerationSpanData,
GuardrailSpanData,
HandoffSpanData,
MCPListToolsSpanData,
ResponseSpanData,
SpanData,
SpeechGroupSpanData,
@ -59,6 +61,7 @@ __all__ = [
"GenerationSpanData",
"GuardrailSpanData",
"HandoffSpanData",
"MCPListToolsSpanData",
"ResponseSpanData",
"SpeechGroupSpanData",
"SpeechSpanData",
@ -69,6 +72,7 @@ __all__ = [
"speech_group_span",
"speech_span",
"transcription_span",
"mcp_tools_span",
]

View file

@ -12,6 +12,7 @@ from .span_data import (
GenerationSpanData,
GuardrailSpanData,
HandoffSpanData,
MCPListToolsSpanData,
ResponseSpanData,
SpeechGroupSpanData,
SpeechSpanData,
@ -424,3 +425,31 @@ def speech_group_span(
parent=parent,
disabled=disabled,
)
def mcp_tools_span(
server: str | None = None,
result: list[str] | None = None,
span_id: str | None = None,
parent: Trace | Span[Any] | None = None,
disabled: bool = False,
) -> Span[MCPListToolsSpanData]:
"""Create a new MCP list tools span. The span will not be started automatically, you should
either do `with mcp_tools_span() ...` or call `span.start()` + `span.finish()` manually.
Args:
server: The name of the MCP server.
result: The result of the MCP list tools call.
span_id: The ID of the span. Optional. If not provided, we will generate an ID. We
recommend using `util.gen_span_id()` to generate a span ID, to guarantee that IDs are
correctly formatted.
parent: The parent span or trace. If not provided, we will automatically use the current
trace/span as the parent.
disabled: If True, we will return a Span but the Span will not be recorded.
"""
return GLOBAL_TRACE_PROVIDER.create_span(
span_data=MCPListToolsSpanData(server=server, result=result),
span_id=span_id,
parent=parent,
disabled=disabled,
)

View file

@ -49,12 +49,19 @@ class AgentSpanData(SpanData):
class FunctionSpanData(SpanData):
__slots__ = ("name", "input", "output")
__slots__ = ("name", "input", "output", "mcp_data")
def __init__(self, name: str, input: str | None, output: Any | None):
def __init__(
self,
name: str,
input: str | None,
output: Any | None,
mcp_data: dict[str, Any] | None = None,
):
self.name = name
self.input = input
self.output = output
self.mcp_data = mcp_data
@property
def type(self) -> str:
@ -66,6 +73,7 @@ class FunctionSpanData(SpanData):
"name": self.name,
"input": self.input,
"output": str(self.output) if self.output else None,
"mcp_data": self.mcp_data,
}
@ -282,3 +290,25 @@ class SpeechGroupSpanData(SpanData):
"type": self.type,
"input": self.input,
}
class MCPListToolsSpanData(SpanData):
__slots__ = (
"server",
"result",
)
def __init__(self, server: str | None = None, result: list[str] | None = None):
self.server = server
self.result = result
@property
def type(self) -> str:
return "mcp_tools"
def export(self) -> dict[str, Any]:
return {
"type": self.type,
"server": self.server,
"result": self.result,
}

View file

@ -10,9 +10,8 @@ from typing import Any, cast
from openai import AsyncOpenAI
from agents.exceptions import AgentsException
from ... import _debug
from ...exceptions import AgentsException
from ...logger import logger
from ...tracing import Span, SpanError, TranscriptionSpanData, transcription_span
from ..exceptions import STTWebsocketConnectionError

View file

@ -52,3 +52,7 @@ class FakeMCPServer(MCPServer):
return CallToolResult(
content=[TextContent(text=self.tool_results[-1], type="text")],
)
@property
def name(self) -> str:
return "fake_mcp_server"

View file

@ -0,0 +1,198 @@
import pytest
from inline_snapshot import snapshot
from agents import Agent, Runner
from ..fake_model import FakeModel
from ..test_responses import get_function_tool, get_function_tool_call, get_text_message
from ..testing_processor import SPAN_PROCESSOR_TESTING, fetch_normalized_spans
from .helpers import FakeMCPServer
@pytest.mark.asyncio
async def test_mcp_tracing():
model = FakeModel()
server = FakeMCPServer()
server.add_tool("test_tool_1", {})
agent = Agent(
name="test",
model=model,
mcp_servers=[server],
tools=[get_function_tool("non_mcp_tool", "tool_result")],
)
model.add_multiple_turn_outputs(
[
# First turn: a message and tool call
[get_text_message("a_message"), get_function_tool_call("test_tool_1", "")],
# Second turn: text message
[get_text_message("done")],
]
)
# First run: should list MCP tools before first and second steps
x = Runner.run_streamed(agent, input="first_test")
async for _ in x.stream_events():
pass
assert x.final_output == "done"
spans = fetch_normalized_spans()
# Should have a single tool listing, and the function span should have MCP data
assert spans == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"data": {
"name": "test",
"handoffs": [],
"tools": ["test_tool_1", "non_mcp_tool"],
"output_type": "str",
},
"children": [
{
"type": "mcp_tools",
"data": {"server": "fake_mcp_server", "result": ["test_tool_1"]},
},
{
"type": "function",
"data": {
"name": "test_tool_1",
"input": "",
"output": '{"type":"text","text":"result_test_tool_1_{}","annotations":null}', # noqa: E501
"mcp_data": {"server": "fake_mcp_server"},
},
},
],
}
],
}
]
)
server.add_tool("test_tool_2", {})
SPAN_PROCESSOR_TESTING.clear()
model.add_multiple_turn_outputs(
[
# First turn: a message and tool call
[
get_text_message("a_message"),
get_function_tool_call("non_mcp_tool", ""),
get_function_tool_call("test_tool_2", ""),
],
# Second turn: text message
[get_text_message("done")],
]
)
await Runner.run(agent, input="second_test")
spans = fetch_normalized_spans()
# Should have a single tool listing, and the function span should have MCP data, and the non-mcp
# tool function span should not have MCP data
assert spans == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"data": {
"name": "test",
"handoffs": [],
"tools": ["test_tool_1", "test_tool_2", "non_mcp_tool"],
"output_type": "str",
},
"children": [
{
"type": "mcp_tools",
"data": {
"server": "fake_mcp_server",
"result": ["test_tool_1", "test_tool_2"],
},
},
{
"type": "function",
"data": {
"name": "non_mcp_tool",
"input": "",
"output": "tool_result",
},
},
{
"type": "function",
"data": {
"name": "test_tool_2",
"input": "",
"output": '{"type":"text","text":"result_test_tool_2_{}","annotations":null}', # noqa: E501
"mcp_data": {"server": "fake_mcp_server"},
},
},
],
}
],
}
]
)
SPAN_PROCESSOR_TESTING.clear()
# Add more tools to the server
server.add_tool("test_tool_3", {})
model.add_multiple_turn_outputs(
[
# First turn: a message and tool call
[get_text_message("a_message"), get_function_tool_call("test_tool_3", "")],
# Second turn: text message
[get_text_message("done")],
]
)
await Runner.run(agent, input="third_test")
spans = fetch_normalized_spans()
# Should have a single tool listing, and the function span should have MCP data, and the non-mcp
# tool function span should not have MCP data
assert spans == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"data": {
"name": "test",
"handoffs": [],
"tools": ["test_tool_1", "test_tool_2", "test_tool_3", "non_mcp_tool"],
"output_type": "str",
},
"children": [
{
"type": "mcp_tools",
"data": {
"server": "fake_mcp_server",
"result": ["test_tool_1", "test_tool_2", "test_tool_3"],
},
},
{
"type": "function",
"data": {
"name": "test_tool_3",
"input": "",
"output": '{"type":"text","text":"result_test_tool_3_{}","annotations":null}', # noqa: E501
"mcp_data": {"server": "fake_mcp_server"},
},
},
],
}
],
}
]
)

View file

@ -16,6 +16,10 @@ class CrashingClientSessionServer(_MCPServerWithClientSession):
self.cleanup_called = True
await super().cleanup()
@property
def name(self) -> str:
return "crashing_client_session_server"
@pytest.mark.asyncio
async def test_server_errors_cause_error_and_cleanup_called():