322 lines
8.7 KiB
Python
322 lines
8.7 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
|
|
import pytest
|
|
|
|
from agents import Agent, RunConfig, Runner, trace
|
|
|
|
from .fake_model import FakeModel
|
|
from .test_responses import get_text_message
|
|
from .testing_processor import fetch_ordered_spans, fetch_traces
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_single_run_is_single_trace():
|
|
agent = Agent(
|
|
name="test_agent",
|
|
model=FakeModel(
|
|
initial_output=[get_text_message("first_test")],
|
|
),
|
|
)
|
|
|
|
await Runner.run(agent, input="first_test")
|
|
|
|
traces = fetch_traces()
|
|
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
|
|
|
|
spans = fetch_ordered_spans()
|
|
assert len(spans) == 1, (
|
|
f"Got {len(spans)}, but expected 1: the agent span. data:"
|
|
f"{[span.span_data for span in spans]}"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_runs_are_multiple_traces():
|
|
model = FakeModel()
|
|
model.add_multiple_turn_outputs(
|
|
[
|
|
[get_text_message("first_test")],
|
|
[get_text_message("second_test")],
|
|
]
|
|
)
|
|
agent = Agent(
|
|
name="test_agent_1",
|
|
model=model,
|
|
)
|
|
|
|
await Runner.run(agent, input="first_test")
|
|
await Runner.run(agent, input="second_test")
|
|
|
|
traces = fetch_traces()
|
|
assert len(traces) == 2, f"Expected 2 traces, got {len(traces)}"
|
|
|
|
spans = fetch_ordered_spans()
|
|
assert len(spans) == 2, f"Got {len(spans)}, but expected 2: agent span per run"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_wrapped_trace_is_single_trace():
|
|
model = FakeModel()
|
|
model.add_multiple_turn_outputs(
|
|
[
|
|
[get_text_message("first_test")],
|
|
[get_text_message("second_test")],
|
|
[get_text_message("third_test")],
|
|
]
|
|
)
|
|
with trace(workflow_name="test_workflow"):
|
|
agent = Agent(
|
|
name="test_agent_1",
|
|
model=model,
|
|
)
|
|
|
|
await Runner.run(agent, input="first_test")
|
|
await Runner.run(agent, input="second_test")
|
|
await Runner.run(agent, input="third_test")
|
|
|
|
traces = fetch_traces()
|
|
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
|
|
|
|
spans = fetch_ordered_spans()
|
|
assert len(spans) == 3, f"Got {len(spans)}, but expected 3: the agent span per run"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_parent_disabled_trace_disabled_agent_trace():
|
|
with trace(workflow_name="test_workflow", disabled=True):
|
|
agent = Agent(
|
|
name="test_agent",
|
|
model=FakeModel(
|
|
initial_output=[get_text_message("first_test")],
|
|
),
|
|
)
|
|
|
|
await Runner.run(agent, input="first_test")
|
|
|
|
traces = fetch_traces()
|
|
assert len(traces) == 0, f"Expected 0 traces, got {len(traces)}"
|
|
spans = fetch_ordered_spans()
|
|
assert len(spans) == 0, (
|
|
f"Expected no spans, got {len(spans)}, with {[x.span_data for x in spans]}"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_manual_disabling_works():
|
|
agent = Agent(
|
|
name="test_agent",
|
|
model=FakeModel(
|
|
initial_output=[get_text_message("first_test")],
|
|
),
|
|
)
|
|
|
|
await Runner.run(agent, input="first_test", run_config=RunConfig(tracing_disabled=True))
|
|
|
|
traces = fetch_traces()
|
|
assert len(traces) == 0, f"Expected 0 traces, got {len(traces)}"
|
|
spans = fetch_ordered_spans()
|
|
assert len(spans) == 0, f"Got {len(spans)}, but expected no spans"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_trace_config_works():
|
|
agent = Agent(
|
|
name="test_agent",
|
|
model=FakeModel(
|
|
initial_output=[get_text_message("first_test")],
|
|
),
|
|
)
|
|
|
|
await Runner.run(
|
|
agent,
|
|
input="first_test",
|
|
run_config=RunConfig(workflow_name="Foo bar", group_id="123", trace_id="456"),
|
|
)
|
|
|
|
traces = fetch_traces()
|
|
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
|
|
export = traces[0].export()
|
|
assert export is not None, "Trace export should not be None"
|
|
assert export["workflow_name"] == "Foo bar"
|
|
assert export["group_id"] == "123"
|
|
assert export["id"] == "456"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_not_starting_streaming_creates_trace():
|
|
agent = Agent(
|
|
name="test_agent",
|
|
model=FakeModel(
|
|
initial_output=[get_text_message("first_test")],
|
|
),
|
|
)
|
|
|
|
result = Runner.run_streamed(agent, input="first_test")
|
|
|
|
# Purposely don't await the stream
|
|
while True:
|
|
if result.is_complete:
|
|
break
|
|
await asyncio.sleep(0.1)
|
|
|
|
traces = fetch_traces()
|
|
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
|
|
|
|
spans = fetch_ordered_spans()
|
|
assert len(spans) == 1, f"Got {len(spans)}, but expected 1: the agent span"
|
|
|
|
# Await the stream to avoid warnings about it not being awaited
|
|
async for _ in result.stream_events():
|
|
pass
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_streaming_single_run_is_single_trace():
|
|
agent = Agent(
|
|
name="test_agent",
|
|
model=FakeModel(
|
|
initial_output=[get_text_message("first_test")],
|
|
),
|
|
)
|
|
|
|
x = Runner.run_streamed(agent, input="first_test")
|
|
async for _ in x.stream_events():
|
|
pass
|
|
|
|
traces = fetch_traces()
|
|
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_streamed_runs_are_multiple_traces():
|
|
model = FakeModel()
|
|
model.add_multiple_turn_outputs(
|
|
[
|
|
[get_text_message("first_test")],
|
|
[get_text_message("second_test")],
|
|
]
|
|
)
|
|
agent = Agent(
|
|
name="test_agent_1",
|
|
model=model,
|
|
)
|
|
|
|
x = Runner.run_streamed(agent, input="first_test")
|
|
async for _ in x.stream_events():
|
|
pass
|
|
|
|
x = Runner.run_streamed(agent, input="second_test")
|
|
async for _ in x.stream_events():
|
|
pass
|
|
|
|
traces = fetch_traces()
|
|
assert len(traces) == 2, f"Expected 2 traces, got {len(traces)}"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_wrapped_streaming_trace_is_single_trace():
|
|
model = FakeModel()
|
|
model.add_multiple_turn_outputs(
|
|
[
|
|
[get_text_message("first_test")],
|
|
[get_text_message("second_test")],
|
|
[get_text_message("third_test")],
|
|
]
|
|
)
|
|
with trace(workflow_name="test_workflow"):
|
|
agent = Agent(
|
|
name="test_agent_1",
|
|
model=model,
|
|
)
|
|
|
|
x = Runner.run_streamed(agent, input="first_test")
|
|
async for _ in x.stream_events():
|
|
pass
|
|
|
|
x = Runner.run_streamed(agent, input="second_test")
|
|
async for _ in x.stream_events():
|
|
pass
|
|
|
|
x = Runner.run_streamed(agent, input="third_test")
|
|
async for _ in x.stream_events():
|
|
pass
|
|
|
|
traces = fetch_traces()
|
|
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_wrapped_mixed_trace_is_single_trace():
|
|
model = FakeModel()
|
|
model.add_multiple_turn_outputs(
|
|
[
|
|
[get_text_message("first_test")],
|
|
[get_text_message("second_test")],
|
|
[get_text_message("third_test")],
|
|
]
|
|
)
|
|
with trace(workflow_name="test_workflow"):
|
|
agent = Agent(
|
|
name="test_agent_1",
|
|
model=model,
|
|
)
|
|
|
|
x = Runner.run_streamed(agent, input="first_test")
|
|
async for _ in x.stream_events():
|
|
pass
|
|
|
|
await Runner.run(agent, input="second_test")
|
|
|
|
x = Runner.run_streamed(agent, input="third_test")
|
|
async for _ in x.stream_events():
|
|
pass
|
|
|
|
traces = fetch_traces()
|
|
assert len(traces) == 1, f"Expected 1 trace, got {len(traces)}"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_parent_disabled_trace_disables_streaming_agent_trace():
|
|
model = FakeModel()
|
|
model.add_multiple_turn_outputs(
|
|
[
|
|
[get_text_message("first_test")],
|
|
[get_text_message("second_test")],
|
|
]
|
|
)
|
|
with trace(workflow_name="test_workflow", disabled=True):
|
|
agent = Agent(
|
|
name="test_agent",
|
|
model=model,
|
|
)
|
|
|
|
x = Runner.run_streamed(agent, input="first_test")
|
|
async for _ in x.stream_events():
|
|
pass
|
|
|
|
traces = fetch_traces()
|
|
assert len(traces) == 0, f"Expected 0 traces, got {len(traces)}"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_manual_streaming_disabling_works():
|
|
model = FakeModel()
|
|
model.add_multiple_turn_outputs(
|
|
[
|
|
[get_text_message("first_test")],
|
|
[get_text_message("second_test")],
|
|
]
|
|
)
|
|
agent = Agent(
|
|
name="test_agent",
|
|
model=model,
|
|
)
|
|
|
|
x = Runner.run_streamed(agent, input="first_test", run_config=RunConfig(tracing_disabled=True))
|
|
async for _ in x.stream_events():
|
|
pass
|
|
|
|
traces = fetch_traces()
|
|
assert len(traces) == 0, f"Expected 0 traces, got {len(traces)}"
|