Only start tracing worker thread on first span/trace (#804)
Closes #796. Shouldn't start a busy waiting thread if there aren't any traces. Test plan ``` import threading assert threading.active_count() == 1 import agents assert threading.active_count() == 1 ```
This commit is contained in:
parent
d4c7a23e1d
commit
995af4d83e
1 changed files with 29 additions and 3 deletions
|
|
@ -188,10 +188,27 @@ class BatchTraceProcessor(TracingProcessor):
|
|||
# Track when we next *must* perform a scheduled export
|
||||
self._next_export_time = time.time() + self._schedule_delay
|
||||
|
||||
self._worker_thread = threading.Thread(target=self._run, daemon=True)
|
||||
self._worker_thread.start()
|
||||
# We lazily start the background worker thread the first time a span/trace is queued.
|
||||
self._worker_thread: threading.Thread | None = None
|
||||
self._thread_start_lock = threading.Lock()
|
||||
|
||||
def _ensure_thread_started(self) -> None:
|
||||
# Fast path without holding the lock
|
||||
if self._worker_thread and self._worker_thread.is_alive():
|
||||
return
|
||||
|
||||
# Double-checked locking to avoid starting multiple threads
|
||||
with self._thread_start_lock:
|
||||
if self._worker_thread and self._worker_thread.is_alive():
|
||||
return
|
||||
|
||||
self._worker_thread = threading.Thread(target=self._run, daemon=True)
|
||||
self._worker_thread.start()
|
||||
|
||||
def on_trace_start(self, trace: Trace) -> None:
|
||||
# Ensure the background worker is running before we enqueue anything.
|
||||
self._ensure_thread_started()
|
||||
|
||||
try:
|
||||
self._queue.put_nowait(trace)
|
||||
except queue.Full:
|
||||
|
|
@ -206,6 +223,9 @@ class BatchTraceProcessor(TracingProcessor):
|
|||
pass
|
||||
|
||||
def on_span_end(self, span: Span[Any]) -> None:
|
||||
# Ensure the background worker is running before we enqueue anything.
|
||||
self._ensure_thread_started()
|
||||
|
||||
try:
|
||||
self._queue.put_nowait(span)
|
||||
except queue.Full:
|
||||
|
|
@ -216,7 +236,13 @@ class BatchTraceProcessor(TracingProcessor):
|
|||
Called when the application stops. We signal our thread to stop, then join it.
|
||||
"""
|
||||
self._shutdown_event.set()
|
||||
self._worker_thread.join(timeout=timeout)
|
||||
|
||||
# Only join if we ever started the background thread; otherwise flush synchronously.
|
||||
if self._worker_thread and self._worker_thread.is_alive():
|
||||
self._worker_thread.join(timeout=timeout)
|
||||
else:
|
||||
# No background thread: process any remaining items synchronously.
|
||||
self._export_batches(force=True)
|
||||
|
||||
def force_flush(self):
|
||||
"""
|
||||
|
|
|
|||
Loading…
Reference in a new issue