openai-agents-python/tests/test_cancel_streaming.py
Andrew Han a113fea0ee
Allow cancel out of the streaming result (#579)
Fix for #574 

@rm-openai I'm not sure how to add a test within the repo but I have
pasted a test script below that seems to work

```python
import asyncio
from openai.types.responses import ResponseTextDeltaEvent
from agents import Agent, Runner

async def main():
    agent = Agent(
        name="Joker",
        instructions="You are a helpful assistant.",
    )

    result = Runner.run_streamed(agent, input="Please tell me 5 jokes.")
    num_visible_event = 0
    async for event in result.stream_events():
        if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
            print(event.data.delta, end="", flush=True)
            num_visible_event += 1
            print(num_visible_event)
            if num_visible_event == 3:
                result.cancel()


if __name__ == "__main__":
    asyncio.run(main())
````
2025-04-23 19:51:10 -04:00

22 lines
602 B
Python

import pytest
from agents import Agent, Runner
from .fake_model import FakeModel
@pytest.mark.asyncio
async def test_joker_streamed_jokes_with_cancel():
model = FakeModel()
agent = Agent(name="Joker", model=model)
result = Runner.run_streamed(agent, input="Please tell me 5 jokes.")
num_events = 0
stop_after = 1 # There are two that the model gives back.
async for _event in result.stream_events():
num_events += 1
if num_events == 1:
result.cancel()
assert num_events == 1, f"Expected {stop_after} visible events, but got {num_events}"