Examples and tests for previous_response_id (#512)

Examples + tests
This commit is contained in:
Rohan Mehta 2025-04-15 12:46:31 -04:00 committed by GitHub
parent b978b4382e
commit f329eef7e8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 158 additions and 0 deletions

View file

@ -0,0 +1,66 @@
import asyncio
from agents import Agent, Runner
"""This demonstrates usage of the `previous_response_id` parameter to continue a conversation.
The second run passes the previous response ID to the model, which allows it to continue the
conversation without re-sending the previous messages.
Notes:
1. This only applies to the OpenAI Responses API. Other models will ignore this parameter.
2. Responses are only stored for 30 days as of this writing, so in production you should
store the response ID along with an expiration date; if the response is no longer valid,
you'll need to re-send the previous conversation history.
"""
async def main():
agent = Agent(
name="Assistant",
instructions="You are a helpful assistant. be VERY concise.",
)
result = await Runner.run(agent, "What is the largest country in South America?")
print(result.final_output)
# Brazil
result = await Runner.run(
agent,
"What is the capital of that country?",
previous_response_id=result.last_response_id,
)
print(result.final_output)
# Brasilia
async def main_stream():
agent = Agent(
name="Assistant",
instructions="You are a helpful assistant. be VERY concise.",
)
result = Runner.run_streamed(agent, "What is the largest country in South America?")
async for event in result.stream_events():
if event.type == "raw_response_event" and event.data.type == "response.output_text.delta":
print(event.data.delta, end="", flush=True)
print()
result = Runner.run_streamed(
agent,
"What is the capital of that country?",
previous_response_id=result.last_response_id,
)
async for event in result.stream_events():
if event.type == "raw_response_event" and event.data.type == "response.output_text.delta":
print(event.data.delta, end="", flush=True)
if __name__ == "__main__":
is_stream = input("Run in stream mode? (y/n): ")
if is_stream == "y":
asyncio.run(main_stream())
else:
asyncio.run(main())

View file

@ -63,6 +63,7 @@ class FakeModel(Model):
"model_settings": model_settings,
"tools": tools,
"output_schema": output_schema,
"previous_response_id": previous_response_id,
}
with generation_span(disabled=not self.tracing_enabled) as span:
@ -98,6 +99,14 @@ class FakeModel(Model):
*,
previous_response_id: str | None,
) -> AsyncIterator[TResponseStreamEvent]:
self.last_turn_args = {
"system_instructions": system_instructions,
"input": input,
"model_settings": model_settings,
"tools": tools,
"output_schema": output_schema,
"previous_response_id": previous_response_id,
}
with generation_span(disabled=not self.tracing_enabled) as span:
output = self.get_next_output()
if isinstance(output, Exception):

View file

@ -662,3 +662,86 @@ async def test_model_settings_override():
# temperature is overridden by Runner.run, but max_tokens is not
assert model.last_turn_args["model_settings"].temperature == 0.5
assert model.last_turn_args["model_settings"].max_tokens == 1000
@pytest.mark.asyncio
async def test_previous_response_id_passed_between_runs():
"""Test that previous_response_id is passed to the model on subsequent runs."""
model = FakeModel()
model.set_next_output([get_text_message("done")])
agent = Agent(name="test", model=model)
assert model.last_turn_args.get("previous_response_id") is None
await Runner.run(agent, input="test", previous_response_id="resp-non-streamed-test")
assert model.last_turn_args.get("previous_response_id") == "resp-non-streamed-test"
@pytest.mark.asyncio
async def test_multi_turn_previous_response_id_passed_between_runs():
"""Test that previous_response_id is passed to the model on subsequent runs."""
model = FakeModel()
agent = Agent(
name="test",
model=model,
tools=[get_function_tool("foo", "tool_result")],
)
model.add_multiple_turn_outputs(
[
# First turn: a message and tool call
[get_text_message("a_message"), get_function_tool_call("foo", json.dumps({"a": "b"}))],
# Second turn: text message
[get_text_message("done")],
]
)
assert model.last_turn_args.get("previous_response_id") is None
await Runner.run(agent, input="test", previous_response_id="resp-test-123")
assert model.last_turn_args.get("previous_response_id") == "resp-test-123"
@pytest.mark.asyncio
async def test_previous_response_id_passed_between_runs_streamed():
"""Test that previous_response_id is passed to the model on subsequent streamed runs."""
model = FakeModel()
model.set_next_output([get_text_message("done")])
agent = Agent(
name="test",
model=model,
)
assert model.last_turn_args.get("previous_response_id") is None
result = Runner.run_streamed(agent, input="test", previous_response_id="resp-stream-test")
async for _ in result.stream_events():
pass
assert model.last_turn_args.get("previous_response_id") == "resp-stream-test"
@pytest.mark.asyncio
async def test_previous_response_id_passed_between_runs_streamed_multi_turn():
"""Test that previous_response_id is passed to the model on subsequent streamed runs."""
model = FakeModel()
agent = Agent(
name="test",
model=model,
tools=[get_function_tool("foo", "tool_result")],
)
model.add_multiple_turn_outputs(
[
# First turn: a message and tool call
[get_text_message("a_message"), get_function_tool_call("foo", json.dumps({"a": "b"}))],
# Second turn: text message
[get_text_message("done")],
]
)
assert model.last_turn_args.get("previous_response_id") is None
result = Runner.run_streamed(agent, input="test", previous_response_id="resp-stream-test")
async for _ in result.stream_events():
pass
assert model.last_turn_args.get("previous_response_id") == "resp-stream-test"