PHP 8.3.31
Preview: _batcher.py Size: 5.70 KB
/proc/thread-self/root/opt/hc_python/lib/python3.12/site-packages/sentry_sdk/_batcher.py

import os
import random
import threading
import weakref
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Generic, TypeVar

from sentry_sdk.envelope import Envelope, Item, PayloadRef
from sentry_sdk.utils import format_timestamp

if TYPE_CHECKING:
    from typing import Any, Callable, Optional

T = TypeVar("T")


class Batcher(Generic[T]):
    MAX_BEFORE_FLUSH = 100
    MAX_BEFORE_DROP = 1_000
    FLUSH_WAIT_TIME = 5.0

    TYPE = ""
    CONTENT_TYPE = ""

    def __init__(
        self,
        capture_func: "Callable[[Envelope], None]",
        record_lost_func: "Callable[..., None]",
    ) -> None:
        self._buffer: "list[T]" = []
        self._capture_func = capture_func
        self._record_lost_func = record_lost_func
        self._running = True
        self._lock = threading.Lock()
        self._active: "threading.local" = threading.local()

        self._flush_event: "threading.Event" = threading.Event()

        self._flusher: "Optional[threading.Thread]" = None
        self._flusher_pid: "Optional[int]" = None

        # See https://github.com/getsentry/sentry-python/blob/051cc01640a29bfd64b1f1e2e3414c02f027dd1b/sentry_sdk/monitor.py#L41-L50
        if hasattr(os, "register_at_fork"):
            weak_reset = weakref.WeakMethod(self._reset_thread_state)

            def _reset_in_child() -> None:
                method = weak_reset()
                if method is not None:
                    method()

            os.register_at_fork(after_in_child=_reset_in_child)

    def _reset_thread_state(self) -> None:
        self._buffer = []
        self._running = True
        self._lock = threading.Lock()
        self._active = threading.local()
        self._flush_event = threading.Event()
        self._flusher = None
        self._flusher_pid = None

    def _ensure_thread(self) -> bool:
        """For forking processes we might need to restart this thread.
        This ensures that our process actually has that thread running.
        """
        if not self._running:
            return False

        pid = os.getpid()
        if self._flusher_pid == pid:
            return True

        with self._lock:
            # Recheck to make sure another thread didn't get here and start the
            # the flusher in the meantime
            if self._flusher_pid == pid:
                return True

            self._flusher_pid = pid

            self._flusher = threading.Thread(target=self._flush_loop)
            self._flusher.daemon = True

            try:
                self._flusher.start()
            except RuntimeError:
                # Unfortunately at this point the interpreter is in a state that no
                # longer allows us to spawn a thread and we have to bail.
                self._running = False
                return False

        return True

    def _flush_loop(self) -> None:
        # Mark the flush-loop thread as active for its entire lifetime so
        # that any re-entrant add() triggered by GC warnings during wait(),
        # flush(), or Event operations is silently dropped instead of
        # deadlocking on internal locks.
        self._active.flag = True
        while self._running:
            self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
            self._flush_event.clear()
            self._flush()

    def add(self, item: "T") -> None:
        # Bail out if the current thread is already executing batcher code.
        # This prevents deadlocks when code running inside the batcher (e.g.
        # _add_to_envelope during flush, or _flush_event.wait/set) triggers
        # a GC-emitted warning that routes back through the logging
        # integration into add().
        if getattr(self._active, "flag", False):
            return None

        self._active.flag = True
        try:
            if not self._ensure_thread() or self._flusher is None:
                return None

            with self._lock:
                if len(self._buffer) >= self.MAX_BEFORE_DROP:
                    self._record_lost(item)
                    return None

                self._buffer.append(item)
                if len(self._buffer) >= self.MAX_BEFORE_FLUSH:
                    self._flush_event.set()
        finally:
            self._active.flag = False

    def kill(self) -> None:
        if self._flusher is None:
            return

        self._running = False
        self._flush_event.set()
        self._flusher = None

    def flush(self) -> None:
        was_active = getattr(self._active, "flag", False)
        self._active.flag = True
        try:
            self._flush()
        finally:
            self._active.flag = was_active

    def _add_to_envelope(self, envelope: "Envelope") -> None:
        envelope.add_item(
            Item(
                type=self.TYPE,
                content_type=self.CONTENT_TYPE,
                headers={
                    "item_count": len(self._buffer),
                },
                payload=PayloadRef(
                    json={
                        "version": 2,
                        "items": [
                            self._to_transport_format(item) for item in self._buffer
                        ],
                    }
                ),
            )
        )

    def _flush(self) -> "Optional[Envelope]":
        envelope = Envelope(
            headers={"sent_at": format_timestamp(datetime.now(timezone.utc))}
        )
        with self._lock:
            if len(self._buffer) == 0:
                return None

            self._add_to_envelope(envelope)
            self._buffer.clear()

        self._capture_func(envelope)
        return envelope

    def _record_lost(self, item: "T") -> None:
        pass

    @staticmethod
    def _to_transport_format(item: "T") -> "Any":
        pass

Directory Contents

Dirs: 5 × Files: 36

Name Size Perms Modified Actions
ai DIR
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
crons DIR
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
profiler DIR
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
15.59 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
2.95 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
49.95 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
61.95 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
959 B lrw-r--r-- 2026-06-11 06:30:30
Edit Download
9.37 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
2.50 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
24.54 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
2.60 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
1.42 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
4.47 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
0 B lrw-r--r-- 2026-06-11 06:30:30
Edit Download
74.09 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
5.99 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
12.82 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
5.08 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
8.59 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
11.85 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
25.08 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
50.33 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
54.36 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
44.41 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
1.24 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
65.96 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
10.91 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
5.70 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
3.00 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
2.43 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
1.88 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
1.14 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
1.21 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
10.98 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
8.12 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
13.16 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
3.85 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
1.46 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download

If ZipArchive is unavailable, a .tar will be created (no compression).