Preview: worker.py
Size: 10.91 KB
/opt/hc_python/lib64/python3.12/site-packages/sentry_sdk/worker.py
import asyncio
import os
import threading
from abc import ABC, abstractmethod
from time import sleep, time
from typing import TYPE_CHECKING
from sentry_sdk._queue import FullError, Queue
from sentry_sdk.consts import DEFAULT_QUEUE_SIZE
from sentry_sdk.utils import logger, mark_sentry_task_internal
if TYPE_CHECKING:
from typing import Any, Callable, Optional
_TERMINATOR = object()
class Worker(ABC):
"""Base class for all workers."""
@property
@abstractmethod
def is_alive(self) -> bool:
"""Whether the worker is alive and running."""
pass
@abstractmethod
def kill(self) -> None:
"""Kill the worker. It will not process any more events."""
pass
def flush(
self, timeout: float, callback: "Optional[Callable[[int, float], Any]]" = None
) -> None:
"""Flush the worker, blocking until done or timeout is reached."""
return None
@abstractmethod
def full(self) -> bool:
"""Whether the worker's queue is full."""
pass
@abstractmethod
def submit(self, callback: "Callable[[], Any]") -> bool:
"""Schedule a callback. Returns True if queued, False if full."""
pass
class BackgroundWorker(Worker):
def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None:
self._queue: "Queue" = Queue(queue_size)
self._lock = threading.Lock()
self._thread: "Optional[threading.Thread]" = None
self._thread_for_pid: "Optional[int]" = None
@property
def is_alive(self) -> bool:
if self._thread_for_pid != os.getpid():
return False
if not self._thread:
return False
return self._thread.is_alive()
def _ensure_thread(self) -> None:
if not self.is_alive:
self.start()
def _timed_queue_join(self, timeout: float) -> bool:
deadline = time() + timeout
queue = self._queue
queue.all_tasks_done.acquire()
try:
while queue.unfinished_tasks:
delay = deadline - time()
if delay <= 0:
return False
queue.all_tasks_done.wait(timeout=delay)
return True
finally:
queue.all_tasks_done.release()
def start(self) -> None:
with self._lock:
if not self.is_alive:
self._thread = threading.Thread(
target=self._target, name="sentry-sdk.BackgroundWorker"
)
self._thread.daemon = True
try:
self._thread.start()
self._thread_for_pid = os.getpid()
except RuntimeError:
# At this point we can no longer start because the interpreter
# is already shutting down. Sadly at this point we can no longer
# send out events.
self._thread = None
def kill(self) -> None:
"""
Kill worker thread. Returns immediately. Not useful for
waiting on shutdown for events, use `flush` for that.
"""
logger.debug("background worker got kill request")
with self._lock:
if self._thread:
try:
self._queue.put_nowait(_TERMINATOR)
except FullError:
logger.debug("background worker queue full, kill failed")
self._thread = None
self._thread_for_pid = None
def flush(self, timeout: float, callback: "Optional[Any]" = None) -> None:
logger.debug("background worker got flush request")
with self._lock:
if self.is_alive and timeout > 0.0:
self._wait_flush(timeout, callback)
logger.debug("background worker flushed")
def full(self) -> bool:
return self._queue.full()
def _wait_flush(self, timeout: float, callback: "Optional[Any]") -> None:
initial_timeout = min(0.1, timeout)
if not self._timed_queue_join(initial_timeout):
pending = self._queue.qsize() + 1
logger.debug("%d event(s) pending on flush", pending)
if callback is not None:
callback(pending, timeout)
if not self._timed_queue_join(timeout - initial_timeout):
pending = self._queue.qsize() + 1
logger.error("flush timed out, dropped %s events", pending)
def submit(self, callback: "Callable[[], Any]") -> bool:
self._ensure_thread()
try:
self._queue.put_nowait(callback)
return True
except FullError:
return False
def _target(self) -> None:
while True:
callback = self._queue.get()
try:
if callback is _TERMINATOR:
break
try:
callback()
except Exception:
logger.error("Failed processing job", exc_info=True)
finally:
self._queue.task_done()
sleep(0)
class AsyncWorker(Worker):
def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None:
self._queue: "Optional[asyncio.Queue[Any]]" = None
self._queue_size = queue_size
self._task: "Optional[asyncio.Task[None]]" = None
# Event loop needs to remain in the same process
self._task_for_pid: "Optional[int]" = None
self._loop: "Optional[asyncio.AbstractEventLoop]" = None
# Track active callback tasks so they have a strong reference and can be cancelled on kill
self._active_tasks: "set[asyncio.Task[None]]" = set()
@property
def is_alive(self) -> bool:
if self._task_for_pid != os.getpid():
return False
if not self._task or not self._loop:
return False
return self._loop.is_running() and not self._task.done()
def kill(self) -> None:
if self._task:
# Cancel the main consumer task to prevent duplicate consumers
self._task.cancel()
# Also cancel any active callback tasks
# Avoid modifying the set while cancelling tasks
tasks_to_cancel = set(self._active_tasks)
for task in tasks_to_cancel:
task.cancel()
self._active_tasks.clear()
self._loop = None
self._task = None
self._task_for_pid = None
def start(self) -> None:
if not self.is_alive:
try:
self._loop = asyncio.get_running_loop()
# Always create a fresh queue on start to avoid stale items
self._queue = asyncio.Queue(maxsize=self._queue_size)
with mark_sentry_task_internal():
self._task = self._loop.create_task(self._target())
self._task_for_pid = os.getpid()
except RuntimeError:
# There is no event loop running
logger.warning("No event loop running, async worker not started")
self._loop = None
self._task = None
self._task_for_pid = None
def full(self) -> bool:
if self._queue is None:
return True
return self._queue.full()
def _ensure_task(self) -> None:
if not self.is_alive:
self.start()
async def _wait_flush(
self, timeout: float, callback: "Optional[Any]" = None
) -> None:
if not self._loop or not self._loop.is_running() or self._queue is None:
return
initial_timeout = min(0.1, timeout)
# Timeout on the join
try:
await asyncio.wait_for(self._queue.join(), timeout=initial_timeout)
except asyncio.TimeoutError:
pending = self._queue.qsize() + len(self._active_tasks)
logger.debug("%d event(s) pending on flush", pending)
if callback is not None:
callback(pending, timeout)
try:
remaining_timeout = timeout - initial_timeout
await asyncio.wait_for(self._queue.join(), timeout=remaining_timeout)
except asyncio.TimeoutError:
pending = self._queue.qsize() + len(self._active_tasks)
logger.error("flush timed out, dropped %s events", pending)
def flush( # type: ignore[override]
self, timeout: float, callback: "Optional[Any]" = None
) -> "Optional[asyncio.Task[None]]":
if self.is_alive and timeout > 0.0 and self._loop and self._loop.is_running():
with mark_sentry_task_internal():
return self._loop.create_task(self._wait_flush(timeout, callback))
return None
def submit(self, callback: "Callable[[], Any]") -> bool:
self._ensure_task()
if self._queue is None:
return False
try:
self._queue.put_nowait(callback)
return True
except asyncio.QueueFull:
return False
async def _target(self) -> None:
if self._queue is None:
return
try:
while True:
callback = await self._queue.get()
if callback is _TERMINATOR:
self._queue.task_done()
break
# Firing tasks instead of awaiting them allows for concurrent requests
with mark_sentry_task_internal():
task = asyncio.create_task(self._process_callback(callback))
# Create a strong reference to the task so it can be cancelled on kill
# and does not get garbage collected while running
self._active_tasks.add(task)
# Capture queue ref at dispatch time so done callbacks use the
# correct queue even if kill()/start() replace self._queue.
queue_ref = self._queue
task.add_done_callback(lambda t: self._on_task_complete(t, queue_ref))
# Yield to let the event loop run other tasks
await asyncio.sleep(0)
except asyncio.CancelledError:
pass # Expected during kill()
async def _process_callback(self, callback: "Callable[[], Any]") -> None:
# Callback is an async coroutine, need to await it
await callback()
def _on_task_complete(
self,
task: "asyncio.Task[None]",
queue: "Optional[asyncio.Queue[Any]]" = None,
) -> None:
try:
task.result()
except asyncio.CancelledError:
pass # Task was cancelled, expected during shutdown
except Exception:
logger.error("Failed processing job", exc_info=True)
finally:
# Mark the task as done and remove it from the active tasks set
# Use the queue reference captured at dispatch time, not self._queue,
# to avoid calling task_done() on a different queue after kill()/start().
if queue is not None:
queue.task_done()
self._active_tasks.discard(task)
Directory Contents
Dirs: 5 × Files: 36