Preview: continuous_profiler.py
Size: 21.80 KB
/proc/thread-self/root/opt/hc_python/lib/python3.12/site-packages/sentry_sdk/profiler/continuous_profiler.py
import atexit
import os
import random
import sys
import threading
import time
import uuid
import warnings
from collections import deque
from datetime import datetime, timezone
from typing import TYPE_CHECKING
from sentry_sdk._lru_cache import LRUCache
from sentry_sdk.consts import VERSION
from sentry_sdk.envelope import Envelope
from sentry_sdk.profiler.utils import (
DEFAULT_SAMPLING_FREQUENCY,
extract_stack,
)
from sentry_sdk.utils import (
capture_internal_exception,
is_gevent,
logger,
now,
set_in_app_in_frames,
)
if TYPE_CHECKING:
from typing import Any, Callable, Deque, Dict, List, Optional, Set, Type, Union
from typing_extensions import TypedDict
from sentry_sdk._types import ContinuousProfilerMode, SDKInfo
from sentry_sdk.profiler.utils import (
ExtractedSample,
FrameId,
ProcessedFrame,
ProcessedStack,
StackId,
ThreadId,
)
ProcessedSample = TypedDict(
"ProcessedSample",
{
"timestamp": float,
"thread_id": ThreadId,
"stack_id": int,
},
)
try:
from gevent.monkey import get_original
from gevent.threadpool import ThreadPool as _ThreadPool
ThreadPool: "Optional[Type[_ThreadPool]]" = _ThreadPool
thread_sleep = get_original("time", "sleep")
except ImportError:
thread_sleep = time.sleep
ThreadPool = None
_scheduler: "Optional[ContinuousScheduler]" = None
def setup_continuous_profiler(
options: "Dict[str, Any]",
sdk_info: "SDKInfo",
capture_func: "Callable[[Envelope], None]",
) -> bool:
global _scheduler
already_initialized = _scheduler is not None
if already_initialized:
logger.debug("[Profiling] Continuous Profiler is already setup")
teardown_continuous_profiler()
if is_gevent():
# If gevent has patched the threading modules then we cannot rely on
# them to spawn a native thread for sampling.
# Instead we default to the GeventContinuousScheduler which is capable of
# spawning native threads within gevent.
default_profiler_mode = GeventContinuousScheduler.mode
else:
default_profiler_mode = ThreadContinuousScheduler.mode
if options.get("profiler_mode") is not None:
profiler_mode = options["profiler_mode"]
else:
# TODO: deprecate this and just use the existing `profiler_mode`
experiments = options.get("_experiments", {})
profiler_mode = (
experiments.get("continuous_profiling_mode") or default_profiler_mode
)
frequency = DEFAULT_SAMPLING_FREQUENCY
if profiler_mode == ThreadContinuousScheduler.mode:
_scheduler = ThreadContinuousScheduler(
frequency, options, sdk_info, capture_func
)
elif profiler_mode == GeventContinuousScheduler.mode:
_scheduler = GeventContinuousScheduler(
frequency, options, sdk_info, capture_func
)
else:
raise ValueError("Unknown continuous profiler mode: {}".format(profiler_mode))
logger.debug(
"[Profiling] Setting up continuous profiler in {mode} mode".format(
mode=_scheduler.mode
)
)
if not already_initialized:
atexit.register(teardown_continuous_profiler)
return True
def is_profile_session_sampled() -> bool:
if _scheduler is None:
return False
return _scheduler.sampled
def try_autostart_continuous_profiler() -> None:
# TODO: deprecate this as it'll be replaced by the auto lifecycle option
if _scheduler is None:
return
if not _scheduler.is_auto_start_enabled():
return
_scheduler.manual_start()
def try_profile_lifecycle_trace_start() -> "Union[ContinuousProfile, None]":
if _scheduler is None:
return None
return _scheduler.auto_start()
def start_profiler() -> None:
if _scheduler is None:
return
_scheduler.manual_start()
def start_profile_session() -> None:
warnings.warn(
"The `start_profile_session` function is deprecated. Please use `start_profile` instead.",
DeprecationWarning,
stacklevel=2,
)
start_profiler()
def stop_profiler() -> None:
if _scheduler is None:
return
_scheduler.manual_stop()
def stop_profile_session() -> None:
warnings.warn(
"The `stop_profile_session` function is deprecated. Please use `stop_profile` instead.",
DeprecationWarning,
stacklevel=2,
)
stop_profiler()
def teardown_continuous_profiler() -> None:
stop_profiler()
global _scheduler
_scheduler = None
def get_profiler_id() -> "Union[str, None]":
if _scheduler is None:
return None
return _scheduler.profiler_id
def determine_profile_session_sampling_decision(
sample_rate: "Union[float, None]",
) -> bool:
# `None` is treated as `0.0`
if not sample_rate:
return False
return random.random() < float(sample_rate)
class ContinuousProfile:
active: bool = True
def stop(self) -> None:
self.active = False
class ContinuousScheduler:
mode: "ContinuousProfilerMode" = "unknown"
def __init__(
self,
frequency: int,
options: "Dict[str, Any]",
sdk_info: "SDKInfo",
capture_func: "Callable[[Envelope], None]",
) -> None:
self.interval = 1.0 / frequency
self.options = options
self.sdk_info = sdk_info
self.capture_func = capture_func
self.lifecycle = self.options.get("profile_lifecycle")
profile_session_sample_rate = self.options.get("profile_session_sample_rate")
self.sampled = determine_profile_session_sampling_decision(
profile_session_sample_rate
)
self.sampler = self.make_sampler()
self.buffer: "Optional[ProfileBuffer]" = None
self.pid: "Optional[int]" = None
self.running = False
self.soft_shutdown = False
self.new_profiles: "Deque[ContinuousProfile]" = deque(maxlen=128)
self.active_profiles: "Set[ContinuousProfile]" = set()
def is_auto_start_enabled(self) -> bool:
# Ensure that the scheduler only autostarts once per process.
# This is necessary because many web servers use forks to spawn
# additional processes. And the profiler is only spawned on the
# master process, then it often only profiles the main process
# and not the ones where the requests are being handled.
if self.pid == os.getpid():
return False
experiments = self.options.get("_experiments")
if not experiments:
return False
return experiments.get("continuous_profiling_auto_start")
def auto_start(self) -> "Union[ContinuousProfile, None]":
if not self.sampled:
return None
if self.lifecycle != "trace":
return None
logger.debug("[Profiling] Auto starting profiler")
profile = ContinuousProfile()
self.new_profiles.append(profile)
self.ensure_running()
return profile
def manual_start(self) -> None:
if not self.sampled:
return
if self.lifecycle != "manual":
return
self.ensure_running()
def manual_stop(self) -> None:
if self.lifecycle != "manual":
return
self.teardown()
def ensure_running(self) -> None:
raise NotImplementedError
def teardown(self) -> None:
raise NotImplementedError
def pause(self) -> None:
raise NotImplementedError
def reset_buffer(self) -> None:
self.buffer = ProfileBuffer(
self.options, self.sdk_info, PROFILE_BUFFER_SECONDS, self.capture_func
)
@property
def profiler_id(self) -> "Union[str, None]":
if not self.running or self.buffer is None:
return None
return self.buffer.profiler_id
def make_sampler(self) -> "Callable[..., bool]":
cwd = os.getcwd()
cache = LRUCache(max_size=256)
if self.lifecycle == "trace":
def _sample_stack(*args: "Any", **kwargs: "Any") -> bool:
"""
Take a sample of the stack on all the threads in the process.
This should be called at a regular interval to collect samples.
"""
# no profiles taking place, so we can stop early
if not self.new_profiles and not self.active_profiles:
return True
# This is the number of profiles we want to pop off.
# It's possible another thread adds a new profile to
# the list and we spend longer than we want inside
# the loop below.
#
# Also make sure to set this value before extracting
# frames so we do not write to any new profiles that
# were started after this point.
new_profiles = len(self.new_profiles)
ts = now()
try:
sample = [
(str(tid), extract_stack(frame, cache, cwd))
for tid, frame in sys._current_frames().items()
]
except AttributeError:
# For some reason, the frame we get doesn't have certain attributes.
# When this happens, we abandon the current sample as it's bad.
capture_internal_exception(sys.exc_info())
return False
# Move the new profiles into the active_profiles set.
#
# We cannot directly add the to active_profiles set
# in `start_profiling` because it is called from other
# threads which can cause a RuntimeError when it the
# set sizes changes during iteration without a lock.
#
# We also want to avoid using a lock here so threads
# that are starting profiles are not blocked until it
# can acquire the lock.
for _ in range(new_profiles):
self.active_profiles.add(self.new_profiles.popleft())
inactive_profiles = []
for profile in self.active_profiles:
if not profile.active:
# If a profile is marked inactive, we buffer it
# to `inactive_profiles` so it can be removed.
# We cannot remove it here as it would result
# in a RuntimeError.
inactive_profiles.append(profile)
for profile in inactive_profiles:
self.active_profiles.remove(profile)
if self.buffer is not None:
self.buffer.write(ts, sample)
return False
else:
def _sample_stack(*args: "Any", **kwargs: "Any") -> bool:
"""
Take a sample of the stack on all the threads in the process.
This should be called at a regular interval to collect samples.
"""
ts = now()
try:
sample = [
(str(tid), extract_stack(frame, cache, cwd))
for tid, frame in sys._current_frames().items()
]
except AttributeError:
# For some reason, the frame we get doesn't have certain attributes.
# When this happens, we abandon the current sample as it's bad.
capture_internal_exception(sys.exc_info())
return False
if self.buffer is not None:
self.buffer.write(ts, sample)
return False
return _sample_stack
def run(self) -> None:
last = time.perf_counter()
while self.running:
self.soft_shutdown = self.sampler()
# some time may have elapsed since the last time
# we sampled, so we need to account for that and
# not sleep for too long
elapsed = time.perf_counter() - last
if elapsed < self.interval:
thread_sleep(self.interval - elapsed)
# the soft shutdown happens here to give it a chance
# for the profiler to be reused
if self.soft_shutdown:
self.running = False
# make sure to explicitly exit the profiler here or there might
# be multiple profilers at once
break
# after sleeping, make sure to take the current
# timestamp so we can use it next iteration
last = time.perf_counter()
buffer = self.buffer
if buffer is not None:
buffer.flush()
class ThreadContinuousScheduler(ContinuousScheduler):
"""
This scheduler is based on running a daemon thread that will call
the sampler at a regular interval.
"""
mode: "ContinuousProfilerMode" = "thread"
name = "sentry.profiler.ThreadContinuousScheduler"
def __init__(
self,
frequency: int,
options: "Dict[str, Any]",
sdk_info: "SDKInfo",
capture_func: "Callable[[Envelope], None]",
) -> None:
super().__init__(frequency, options, sdk_info, capture_func)
self.thread: "Optional[threading.Thread]" = None
self.lock = threading.Lock()
def ensure_running(self) -> None:
self.soft_shutdown = False
pid = os.getpid()
# is running on the right process
if self.running and self.pid == pid:
return
with self.lock:
# another thread may have tried to acquire the lock
# at the same time so it may start another thread
# make sure to check again before proceeding
if self.running and self.pid == pid:
return
self.pid = pid
self.running = True
# if the profiler thread is changing,
# we should create a new buffer along with it
self.reset_buffer()
# make sure the thread is a daemon here otherwise this
# can keep the application running after other threads
# have exited
self.thread = threading.Thread(name=self.name, target=self.run, daemon=True)
try:
self.thread.start()
except RuntimeError:
# Unfortunately at this point the interpreter is in a state that no
# longer allows us to spawn a thread and we have to bail.
self.running = False
self.thread = None
def teardown(self) -> None:
if self.running:
self.running = False
if self.thread is not None:
self.thread.join()
self.thread = None
class GeventContinuousScheduler(ContinuousScheduler):
"""
This scheduler is based on the thread scheduler but adapted to work with
gevent. When using gevent, it may monkey patch the threading modules
(`threading` and `_thread`). This results in the use of greenlets instead
of native threads.
This is an issue because the sampler CANNOT run in a greenlet because
1. Other greenlets doing sync work will prevent the sampler from running
2. The greenlet runs in the same thread as other greenlets so when taking
a sample, other greenlets will have been evicted from the thread. This
results in a sample containing only the sampler's code.
"""
mode: "ContinuousProfilerMode" = "gevent"
def __init__(
self,
frequency: int,
options: "Dict[str, Any]",
sdk_info: "SDKInfo",
capture_func: "Callable[[Envelope], None]",
) -> None:
if ThreadPool is None:
raise ValueError("Profiler mode: {} is not available".format(self.mode))
super().__init__(frequency, options, sdk_info, capture_func)
self.thread: "Optional[_ThreadPool]" = None
self.lock = threading.Lock()
def ensure_running(self) -> None:
self.soft_shutdown = False
pid = os.getpid()
# is running on the right process
if self.running and self.pid == pid:
return
with self.lock:
# another thread may have tried to acquire the lock
# at the same time so it may start another thread
# make sure to check again before proceeding
if self.running and self.pid == pid:
return
self.pid = pid
self.running = True
# if the profiler thread is changing,
# we should create a new buffer along with it
self.reset_buffer()
self.thread = ThreadPool(1) # type: ignore[misc]
try:
self.thread.spawn(self.run)
except RuntimeError:
# Unfortunately at this point the interpreter is in a state that no
# longer allows us to spawn a thread and we have to bail.
self.running = False
self.thread = None
def teardown(self) -> None:
if self.running:
self.running = False
if self.thread is not None:
self.thread.join()
self.thread = None
PROFILE_BUFFER_SECONDS = 60
class ProfileBuffer:
def __init__(
self,
options: "Dict[str, Any]",
sdk_info: "SDKInfo",
buffer_size: int,
capture_func: "Callable[[Envelope], None]",
) -> None:
self.options = options
self.sdk_info = sdk_info
self.buffer_size = buffer_size
self.capture_func = capture_func
self.profiler_id = uuid.uuid4().hex
self.chunk = ProfileChunk()
# Make sure to use the same clock to compute a sample's monotonic timestamp
# to ensure the timestamps are correctly aligned.
self.start_monotonic_time = now()
# Make sure the start timestamp is defined only once per profiler id.
# This prevents issues with clock drift within a single profiler session.
#
# Subtracting the start_monotonic_time here to find a fixed starting position
# for relative monotonic timestamps for each sample.
self.start_timestamp = (
datetime.now(timezone.utc).timestamp() - self.start_monotonic_time
)
def write(self, monotonic_time: float, sample: "ExtractedSample") -> None:
if self.should_flush(monotonic_time):
self.flush()
self.chunk = ProfileChunk()
self.start_monotonic_time = now()
self.chunk.write(self.start_timestamp + monotonic_time, sample)
def should_flush(self, monotonic_time: float) -> bool:
# If the delta between the new monotonic time and the start monotonic time
# exceeds the buffer size, it means we should flush the chunk
return monotonic_time - self.start_monotonic_time >= self.buffer_size
def flush(self) -> None:
chunk = self.chunk.to_json(self.profiler_id, self.options, self.sdk_info)
envelope = Envelope()
envelope.add_profile_chunk(chunk)
self.capture_func(envelope)
class ProfileChunk:
def __init__(self) -> None:
self.chunk_id = uuid.uuid4().hex
self.indexed_frames: "Dict[FrameId, int]" = {}
self.indexed_stacks: "Dict[StackId, int]" = {}
self.frames: "List[ProcessedFrame]" = []
self.stacks: "List[ProcessedStack]" = []
self.samples: "List[ProcessedSample]" = []
def write(self, ts: float, sample: "ExtractedSample") -> None:
for tid, (stack_id, frame_ids, frames) in sample:
try:
# Check if the stack is indexed first, this lets us skip
# indexing frames if it's not necessary
if stack_id not in self.indexed_stacks:
for i, frame_id in enumerate(frame_ids):
if frame_id not in self.indexed_frames:
self.indexed_frames[frame_id] = len(self.indexed_frames)
self.frames.append(frames[i])
self.indexed_stacks[stack_id] = len(self.indexed_stacks)
self.stacks.append(
[self.indexed_frames[frame_id] for frame_id in frame_ids]
)
self.samples.append(
{
"timestamp": ts,
"thread_id": tid,
"stack_id": self.indexed_stacks[stack_id],
}
)
except AttributeError:
# For some reason, the frame we get doesn't have certain attributes.
# When this happens, we abandon the current sample as it's bad.
capture_internal_exception(sys.exc_info())
def to_json(
self, profiler_id: str, options: "Dict[str, Any]", sdk_info: "SDKInfo"
) -> "Dict[str, Any]":
profile = {
"frames": self.frames,
"stacks": self.stacks,
"samples": self.samples,
"thread_metadata": {
str(thread.ident): {
"name": str(thread.name),
}
for thread in threading.enumerate()
},
}
set_in_app_in_frames(
profile["frames"],
options["in_app_exclude"],
options["in_app_include"],
options["project_root"],
)
payload = {
"chunk_id": self.chunk_id,
"client_sdk": {
"name": sdk_info["name"],
"version": VERSION,
},
"platform": "python",
"profile": profile,
"profiler_id": profiler_id,
"version": "2",
}
for key in "release", "environment", "dist":
if options[key] is not None:
payload[key] = str(options[key]).strip()
return payload
Directory Contents
Dirs: 1 × Files: 4