From 995af4d83e0b0e086ad8f2b11f194fa29d1e27e1 Mon Sep 17 00:00:00 2001 From: Rohan Mehta Date: Mon, 2 Jun 2025 15:32:02 -0400 Subject: [PATCH] Only start tracing worker thread on first span/trace (#804) Closes #796. Shouldn't start a busy waiting thread if there aren't any traces. Test plan ``` import threading assert threading.active_count() == 1 import agents assert threading.active_count() == 1 ``` --- src/agents/tracing/processors.py | 32 +++++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/src/agents/tracing/processors.py b/src/agents/tracing/processors.py index 2913b11..73f7331 100644 --- a/src/agents/tracing/processors.py +++ b/src/agents/tracing/processors.py @@ -188,10 +188,27 @@ class BatchTraceProcessor(TracingProcessor): # Track when we next *must* perform a scheduled export self._next_export_time = time.time() + self._schedule_delay - self._worker_thread = threading.Thread(target=self._run, daemon=True) - self._worker_thread.start() + # We lazily start the background worker thread the first time a span/trace is queued. + self._worker_thread: threading.Thread | None = None + self._thread_start_lock = threading.Lock() + + def _ensure_thread_started(self) -> None: + # Fast path without holding the lock + if self._worker_thread and self._worker_thread.is_alive(): + return + + # Double-checked locking to avoid starting multiple threads + with self._thread_start_lock: + if self._worker_thread and self._worker_thread.is_alive(): + return + + self._worker_thread = threading.Thread(target=self._run, daemon=True) + self._worker_thread.start() def on_trace_start(self, trace: Trace) -> None: + # Ensure the background worker is running before we enqueue anything. + self._ensure_thread_started() + try: self._queue.put_nowait(trace) except queue.Full: @@ -206,6 +223,9 @@ class BatchTraceProcessor(TracingProcessor): pass def on_span_end(self, span: Span[Any]) -> None: + # Ensure the background worker is running before we enqueue anything. + self._ensure_thread_started() + try: self._queue.put_nowait(span) except queue.Full: @@ -216,7 +236,13 @@ class BatchTraceProcessor(TracingProcessor): Called when the application stops. We signal our thread to stop, then join it. """ self._shutdown_event.set() - self._worker_thread.join(timeout=timeout) + + # Only join if we ever started the background thread; otherwise flush synchronously. + if self._worker_thread and self._worker_thread.is_alive(): + self._worker_thread.join(timeout=timeout) + else: + # No background thread: process any remaining items synchronously. + self._export_batches(force=True) def force_flush(self): """