Stronger tracing tests with inline-snapshot (#25)

`assert len(spans) == 12` is a very weak assertion. This PR asserts the
exported traces and spans more precisely in a readable tree format. And
when the format of an exported trace/span changes (e.g. a new key is
added to every span), you can use `pytest --inline-snapshot=fix` to
update all relevant tests automatically. See
https://15r10nk.github.io/inline-snapshot/latest/ for more info.
This commit is contained in:
Rohan Mehta 2025-03-17 17:59:04 -04:00 committed by GitHub
commit 3a97b15b89
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 840 additions and 4 deletions

View file

@ -3,12 +3,13 @@ from __future__ import annotations
import asyncio
import pytest
from inline_snapshot import snapshot
from agents import Agent, RunConfig, Runner, trace
from .fake_model import FakeModel
from .test_responses import get_text_message
from .testing_processor import fetch_ordered_spans, fetch_traces
from .testing_processor import fetch_normalized_spans, fetch_ordered_spans, fetch_traces
@pytest.mark.asyncio
@ -25,6 +26,25 @@ async def test_single_run_is_single_trace():
traces = fetch_traces()
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"data": {
"name": "test_agent",
"handoffs": [],
"tools": [],
"output_type": "str",
},
}
],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 1, (
f"Got {len(spans)}, but expected 1: the agent span. data:"
@ -52,6 +72,39 @@ async def test_multiple_runs_are_multiple_traces():
traces = fetch_traces()
assert len(traces) == 2, f"Expected 2 traces, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"data": {
"name": "test_agent_1",
"handoffs": [],
"tools": [],
"output_type": "str",
},
}
],
},
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"data": {
"name": "test_agent_1",
"handoffs": [],
"tools": [],
"output_type": "str",
},
}
],
},
]
)
spans = fetch_ordered_spans()
assert len(spans) == 2, f"Got {len(spans)}, but expected 2: agent span per run"
@ -79,6 +132,43 @@ async def test_wrapped_trace_is_single_trace():
traces = fetch_traces()
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "test_workflow",
"children": [
{
"type": "agent",
"data": {
"name": "test_agent_1",
"handoffs": [],
"tools": [],
"output_type": "str",
},
},
{
"type": "agent",
"data": {
"name": "test_agent_1",
"handoffs": [],
"tools": [],
"output_type": "str",
},
},
{
"type": "agent",
"data": {
"name": "test_agent_1",
"handoffs": [],
"tools": [],
"output_type": "str",
},
},
],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 3, f"Got {len(spans)}, but expected 3: the agent span per run"
@ -97,6 +187,8 @@ async def test_parent_disabled_trace_disabled_agent_trace():
traces = fetch_traces()
assert len(traces) == 0, f"Expected 0 traces, got {len(traces)}"
assert fetch_normalized_spans() == snapshot([])
spans = fetch_ordered_spans()
assert len(spans) == 0, (
f"Expected no spans, got {len(spans)}, with {[x.span_data for x in spans]}"
@ -116,6 +208,8 @@ async def test_manual_disabling_works():
traces = fetch_traces()
assert len(traces) == 0, f"Expected 0 traces, got {len(traces)}"
assert fetch_normalized_spans() == snapshot([])
spans = fetch_ordered_spans()
assert len(spans) == 0, f"Got {len(spans)}, but expected no spans"
@ -164,6 +258,25 @@ async def test_not_starting_streaming_creates_trace():
traces = fetch_traces()
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"data": {
"name": "test_agent",
"handoffs": [],
"tools": [],
"output_type": "str",
},
}
],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 1, f"Got {len(spans)}, but expected 1: the agent span"

View file

@ -1,4 +1,5 @@
import pytest
from inline_snapshot import snapshot
from openai import AsyncOpenAI
from openai.types.responses import ResponseCompletedEvent
@ -6,7 +7,7 @@ from agents import ModelSettings, ModelTracing, OpenAIResponsesModel, trace
from agents.tracing.span_data import ResponseSpanData
from tests import fake_model
from .testing_processor import fetch_ordered_spans
from .testing_processor import fetch_normalized_spans, fetch_ordered_spans
class DummyTracing:
@ -54,6 +55,15 @@ async def test_get_response_creates_trace(monkeypatch):
"instr", "input", ModelSettings(), [], None, [], ModelTracing.ENABLED
)
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "test",
"children": [{"type": "response", "data": {"response_id": "dummy-id"}}],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 1
@ -82,6 +92,10 @@ async def test_non_data_tracing_doesnt_set_response_id(monkeypatch):
"instr", "input", ModelSettings(), [], None, [], ModelTracing.ENABLED_WITHOUT_DATA
)
assert fetch_normalized_spans() == snapshot(
[{"workflow_name": "test", "children": [{"type": "response"}]}]
)
spans = fetch_ordered_spans()
assert len(spans) == 1
assert spans[0].span_data.response is None
@ -107,6 +121,8 @@ async def test_disable_tracing_does_not_create_span(monkeypatch):
"instr", "input", ModelSettings(), [], None, [], ModelTracing.DISABLED
)
assert fetch_normalized_spans() == snapshot([{"workflow_name": "test"}])
spans = fetch_ordered_spans()
assert len(spans) == 0
@ -139,6 +155,15 @@ async def test_stream_response_creates_trace(monkeypatch):
):
pass
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "test",
"children": [{"type": "response", "data": {"response_id": "dummy-id-123"}}],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 1
assert isinstance(spans[0].span_data, ResponseSpanData)
@ -174,6 +199,10 @@ async def test_stream_non_data_tracing_doesnt_set_response_id(monkeypatch):
):
pass
assert fetch_normalized_spans() == snapshot(
[{"workflow_name": "test", "children": [{"type": "response"}]}]
)
spans = fetch_ordered_spans()
assert len(spans) == 1
assert isinstance(spans[0].span_data, ResponseSpanData)
@ -208,5 +237,7 @@ async def test_stream_disabled_tracing_doesnt_create_span(monkeypatch):
):
pass
assert fetch_normalized_spans() == snapshot([{"workflow_name": "test"}])
spans = fetch_ordered_spans()
assert len(spans) == 0

View file

@ -4,6 +4,7 @@ import json
from typing import Any
import pytest
from inline_snapshot import snapshot
from typing_extensions import TypedDict
from agents import (
@ -27,7 +28,7 @@ from .test_responses import (
get_handoff_tool_call,
get_text_message,
)
from .testing_processor import fetch_ordered_spans, fetch_traces
from .testing_processor import fetch_normalized_spans, fetch_ordered_spans, fetch_traces
@pytest.mark.asyncio
@ -45,6 +46,34 @@ async def test_single_turn_model_error():
traces = fetch_traces()
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"data": {
"name": "test_agent",
"handoffs": [],
"tools": [],
"output_type": "str",
},
"children": [
{
"type": "generation",
"error": {
"message": "Error",
"data": {"name": "ValueError", "message": "test error"},
},
}
],
}
],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 2, f"should have agent and generation spans, got {len(spans)}"
@ -80,6 +109,43 @@ async def test_multi_turn_no_handoffs():
traces = fetch_traces()
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"data": {
"name": "test_agent",
"handoffs": [],
"tools": ["foo"],
"output_type": "str",
},
"children": [
{"type": "generation"},
{
"type": "function",
"data": {
"name": "foo",
"input": '{"a": "b"}',
"output": "tool_result",
},
},
{
"type": "generation",
"error": {
"message": "Error",
"data": {"name": "ValueError", "message": "test error"},
},
},
],
}
],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 4, (
f"should have agent, generation, tool, generation, got {len(spans)} with data: "
@ -110,6 +176,39 @@ async def test_tool_call_error():
traces = fetch_traces()
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"data": {
"name": "test_agent",
"handoffs": [],
"tools": ["foo"],
"output_type": "str",
},
"children": [
{"type": "generation"},
{
"type": "function",
"error": {
"message": "Error running tool",
"data": {
"tool_name": "foo",
"error": "Invalid JSON input for tool foo: bad_json",
},
},
"data": {"name": "foo", "input": "bad_json"},
},
],
}
],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 3, (
f"should have agent, generation, tool spans, got {len(spans)} with data: "
@ -159,6 +258,43 @@ async def test_multiple_handoff_doesnt_error():
traces = fetch_traces()
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"data": {
"name": "test",
"handoffs": ["test", "test"],
"tools": ["some_function"],
"output_type": "str",
},
"children": [
{"type": "generation"},
{
"type": "function",
"data": {
"name": "some_function",
"input": '{"a": "b"}',
"output": "result",
},
},
{"type": "generation"},
{"type": "handoff", "data": {"from_agent": "test", "to_agent": "test"}},
],
},
{
"type": "agent",
"data": {"name": "test", "handoffs": [], "tools": [], "output_type": "str"},
"children": [{"type": "generation"}],
},
],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 7, (
f"should have 2 agent, 1 function, 3 generation, 1 handoff, got {len(spans)} with data: "
@ -193,6 +329,21 @@ async def test_multiple_final_output_doesnt_error():
traces = fetch_traces()
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"data": {"name": "test", "handoffs": [], "tools": [], "output_type": "Foo"},
"children": [{"type": "generation"}],
}
],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 2, (
f"should have 1 agent, 1 generation, got {len(spans)} with data: "
@ -251,6 +402,76 @@ async def test_handoffs_lead_to_correct_agent_spans():
traces = fetch_traces()
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"data": {
"name": "test_agent_3",
"handoffs": ["test_agent_1", "test_agent_2"],
"tools": ["some_function"],
"output_type": "str",
},
"children": [
{"type": "generation"},
{
"type": "function",
"data": {
"name": "some_function",
"input": '{"a": "b"}',
"output": "result",
},
},
{"type": "generation"},
{
"type": "handoff",
"data": {"from_agent": "test_agent_3", "to_agent": "test_agent_1"},
},
],
},
{
"type": "agent",
"data": {
"name": "test_agent_1",
"handoffs": ["test_agent_3"],
"tools": ["some_function"],
"output_type": "str",
},
"children": [
{"type": "generation"},
{
"type": "function",
"data": {
"name": "some_function",
"input": '{"a": "b"}',
"output": "result",
},
},
{"type": "generation"},
{
"type": "handoff",
"data": {"from_agent": "test_agent_1", "to_agent": "test_agent_3"},
},
],
},
{
"type": "agent",
"data": {
"name": "test_agent_3",
"handoffs": ["test_agent_1", "test_agent_2"],
"tools": ["some_function"],
"output_type": "str",
},
"children": [{"type": "generation"}],
},
],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 12, (
f"should have 3 agents, 2 function, 5 generation, 2 handoff, got {len(spans)} with data: "
@ -285,6 +506,38 @@ async def test_max_turns_exceeded():
traces = fetch_traces()
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"error": {"message": "Max turns exceeded", "data": {"max_turns": 2}},
"data": {
"name": "test",
"handoffs": [],
"tools": ["foo"],
"output_type": "Foo",
},
"children": [
{"type": "generation"},
{
"type": "function",
"data": {"name": "foo", "input": "", "output": "result"},
},
{"type": "generation"},
{
"type": "function",
"data": {"name": "foo", "input": "", "output": "result"},
},
],
}
],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 5, (
f"should have 1 agent span, 2 generations, 2 function calls, got "
@ -318,6 +571,30 @@ async def test_guardrail_error():
traces = fetch_traces()
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"error": {
"message": "Guardrail tripwire triggered",
"data": {"guardrail": "guardrail_function"},
},
"data": {"name": "test", "handoffs": [], "tools": [], "output_type": "str"},
"children": [
{
"type": "guardrail",
"data": {"name": "guardrail_function", "triggered": True},
}
],
}
],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 2, (
f"should have 1 agent, 1 guardrail, got {len(spans)} with data: "

View file

@ -5,6 +5,7 @@ import json
from typing import Any
import pytest
from inline_snapshot import snapshot
from typing_extensions import TypedDict
from agents import (
@ -32,7 +33,7 @@ from .test_responses import (
get_handoff_tool_call,
get_text_message,
)
from .testing_processor import fetch_ordered_spans, fetch_traces
from .testing_processor import fetch_normalized_spans, fetch_ordered_spans, fetch_traces
@pytest.mark.asyncio
@ -52,6 +53,35 @@ async def test_single_turn_model_error():
traces = fetch_traces()
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"error": {"message": "Error in agent run", "data": {"error": "test error"}},
"data": {
"name": "test_agent",
"handoffs": [],
"tools": [],
"output_type": "str",
},
"children": [
{
"type": "generation",
"error": {
"message": "Error",
"data": {"name": "ValueError", "message": "test error"},
},
}
],
}
],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 2, f"should have agent and generation spans, got {len(spans)}"
@ -89,6 +119,44 @@ async def test_multi_turn_no_handoffs():
traces = fetch_traces()
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"error": {"message": "Error in agent run", "data": {"error": "test error"}},
"data": {
"name": "test_agent",
"handoffs": [],
"tools": ["foo"],
"output_type": "str",
},
"children": [
{"type": "generation"},
{
"type": "function",
"data": {
"name": "foo",
"input": '{"a": "b"}',
"output": "tool_result",
},
},
{
"type": "generation",
"error": {
"message": "Error",
"data": {"name": "ValueError", "message": "test error"},
},
},
],
}
],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 4, (
f"should have agent, generation, tool, generation, got {len(spans)} with data: "
@ -121,6 +189,43 @@ async def test_tool_call_error():
traces = fetch_traces()
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"error": {
"message": "Error in agent run",
"data": {"error": "Invalid JSON input for tool foo: bad_json"},
},
"data": {
"name": "test_agent",
"handoffs": [],
"tools": ["foo"],
"output_type": "str",
},
"children": [
{"type": "generation"},
{
"type": "function",
"error": {
"message": "Error running tool",
"data": {
"tool_name": "foo",
"error": "Invalid JSON input for tool foo: bad_json",
},
},
"data": {"name": "foo", "input": "bad_json"},
},
],
}
],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 3, (
f"should have agent, generation, tool spans, got {len(spans)} with data: "
@ -173,6 +278,43 @@ async def test_multiple_handoff_doesnt_error():
traces = fetch_traces()
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"data": {
"name": "test",
"handoffs": ["test", "test"],
"tools": ["some_function"],
"output_type": "str",
},
"children": [
{"type": "generation"},
{
"type": "function",
"data": {
"name": "some_function",
"input": '{"a": "b"}',
"output": "result",
},
},
{"type": "generation"},
{"type": "handoff", "data": {"from_agent": "test", "to_agent": "test"}},
],
},
{
"type": "agent",
"data": {"name": "test", "handoffs": [], "tools": [], "output_type": "str"},
"children": [{"type": "generation"}],
},
],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 7, (
f"should have 2 agent, 1 function, 3 generation, 1 handoff, got {len(spans)} with data: "
@ -211,6 +353,21 @@ async def test_multiple_final_output_no_error():
traces = fetch_traces()
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"data": {"name": "test", "handoffs": [], "tools": [], "output_type": "Foo"},
"children": [{"type": "generation"}],
}
],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 2, (
f"should have 1 agent, 1 generation, got {len(spans)} with data: "
@ -271,12 +428,152 @@ async def test_handoffs_lead_to_correct_agent_spans():
traces = fetch_traces()
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"data": {
"name": "test_agent_3",
"handoffs": ["test_agent_1", "test_agent_2"],
"tools": ["some_function"],
"output_type": "str",
},
"children": [
{"type": "generation"},
{
"type": "function",
"data": {
"name": "some_function",
"input": '{"a": "b"}',
"output": "result",
},
},
{"type": "generation"},
{
"type": "handoff",
"data": {"from_agent": "test_agent_3", "to_agent": "test_agent_1"},
},
],
},
{
"type": "agent",
"data": {
"name": "test_agent_1",
"handoffs": ["test_agent_3"],
"tools": ["some_function"],
"output_type": "str",
},
"children": [
{"type": "generation"},
{
"type": "function",
"data": {
"name": "some_function",
"input": '{"a": "b"}',
"output": "result",
},
},
{"type": "generation"},
{
"type": "handoff",
"data": {"from_agent": "test_agent_1", "to_agent": "test_agent_3"},
},
],
},
{
"type": "agent",
"data": {
"name": "test_agent_3",
"handoffs": ["test_agent_1", "test_agent_2"],
"tools": ["some_function"],
"output_type": "str",
},
"children": [{"type": "generation"}],
},
],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 12, (
f"should have 3 agents, 2 function, 5 generation, 2 handoff, got {len(spans)} with data: "
f"{[x.span_data for x in spans]}"
)
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"data": {
"name": "test_agent_3",
"handoffs": ["test_agent_1", "test_agent_2"],
"tools": ["some_function"],
"output_type": "str",
},
"children": [
{"type": "generation"},
{
"type": "function",
"data": {
"name": "some_function",
"input": '{"a": "b"}',
"output": "result",
},
},
{"type": "generation"},
{
"type": "handoff",
"data": {"from_agent": "test_agent_3", "to_agent": "test_agent_1"},
},
],
},
{
"type": "agent",
"data": {
"name": "test_agent_1",
"handoffs": ["test_agent_3"],
"tools": ["some_function"],
"output_type": "str",
},
"children": [
{"type": "generation"},
{
"type": "function",
"data": {
"name": "some_function",
"input": '{"a": "b"}',
"output": "result",
},
},
{"type": "generation"},
{
"type": "handoff",
"data": {"from_agent": "test_agent_1", "to_agent": "test_agent_3"},
},
],
},
{
"type": "agent",
"data": {
"name": "test_agent_3",
"handoffs": ["test_agent_1", "test_agent_2"],
"tools": ["some_function"],
"output_type": "str",
},
"children": [{"type": "generation"}],
},
],
}
]
)
@pytest.mark.asyncio
async def test_max_turns_exceeded():
@ -307,6 +604,38 @@ async def test_max_turns_exceeded():
traces = fetch_traces()
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"error": {"message": "Max turns exceeded", "data": {"max_turns": 2}},
"data": {
"name": "test",
"handoffs": [],
"tools": ["foo"],
"output_type": "Foo",
},
"children": [
{"type": "generation"},
{
"type": "function",
"data": {"name": "foo", "input": "", "output": "result"},
},
{"type": "generation"},
{
"type": "function",
"data": {"name": "foo", "input": "", "output": "result"},
},
],
}
],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 5, (
f"should have 1 agent, 2 generations, 2 function calls, got "
@ -347,6 +676,33 @@ async def test_input_guardrail_error():
traces = fetch_traces()
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"error": {
"message": "Guardrail tripwire triggered",
"data": {
"guardrail": "input_guardrail_function",
"type": "input_guardrail",
},
},
"data": {"name": "test", "handoffs": [], "tools": [], "output_type": "str"},
"children": [
{
"type": "guardrail",
"data": {"name": "input_guardrail_function", "triggered": True},
}
],
}
],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 2, (
f"should have 1 agent, 1 guardrail, got {len(spans)} with data: "
@ -387,6 +743,30 @@ async def test_output_guardrail_error():
traces = fetch_traces()
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
assert fetch_normalized_spans() == snapshot(
[
{
"workflow_name": "Agent workflow",
"children": [
{
"type": "agent",
"error": {
"message": "Guardrail tripwire triggered",
"data": {"guardrail": "output_guardrail_function"},
},
"data": {"name": "test", "handoffs": [], "tools": [], "output_type": "str"},
"children": [
{
"type": "guardrail",
"data": {"name": "output_guardrail_function", "triggered": True},
}
],
}
],
}
]
)
spans = fetch_ordered_spans()
assert len(spans) == 2, (
f"should have 1 agent, 1 guardrail, got {len(spans)} with data: "

View file

@ -1,6 +1,7 @@
from __future__ import annotations
import threading
from datetime import datetime
from typing import Any, Literal
from agents.tracing import Span, Trace, TracingProcessor
@ -77,3 +78,37 @@ def fetch_traces() -> list[Trace]:
def fetch_events() -> list[TestSpanProcessorEvent]:
return SPAN_PROCESSOR_TESTING._events
def fetch_normalized_spans():
nodes: dict[tuple[str, str | None], dict[str, Any]] = {}
traces = []
for trace_obj in fetch_traces():
trace = trace_obj.export()
assert trace
assert trace.pop("object") == "trace"
assert trace.pop("id").startswith("trace_")
trace = {k: v for k, v in trace.items() if v is not None}
nodes[(trace_obj.trace_id, None)] = trace
traces.append(trace)
if not traces:
assert not fetch_ordered_spans()
for span_obj in fetch_ordered_spans():
span = span_obj.export()
assert span
assert span.pop("object") == "trace.span"
assert span.pop("id").startswith("span_")
assert datetime.fromisoformat(span.pop("started_at"))
assert datetime.fromisoformat(span.pop("ended_at"))
parent_id = span.pop("parent_id")
assert "type" not in span
span_data = span.pop("span_data")
span = {"type": span_data.pop("type")} | {k: v for k, v in span.items() if v is not None}
span_data = {k: v for k, v in span_data.items() if v is not None}
if span_data:
span["data"] = span_data
nodes[(span_obj.trace_id, span_obj.span_id)] = span
nodes[(span.pop("trace_id"), parent_id)].setdefault("children", []).append(span)
return traces