Preview: beat.py
Size: 8.63 KB
/proc/self/root/opt/hc_python/lib/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py
from typing import TYPE_CHECKING
import sentry_sdk
from sentry_sdk.crons import MonitorStatus, capture_checkin
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.integrations.celery.utils import (
_get_humanized_interval,
_now_seconds_since_epoch,
)
from sentry_sdk.utils import (
logger,
match_regex_list,
)
if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any, Optional, TypeVar, Union
from sentry_sdk._types import (
MonitorConfig,
MonitorConfigScheduleType,
MonitorConfigScheduleUnit,
)
F = TypeVar("F", bound=Callable[..., Any])
try:
from celery import Celery, Task # type: ignore
from celery.beat import Scheduler # type: ignore
from celery.schedules import crontab, schedule # type: ignore
from celery.signals import ( # type: ignore
task_failure,
task_retry,
task_success,
)
except ImportError:
raise DidNotEnable("Celery not installed")
try:
from redbeat.schedulers import RedBeatScheduler # type: ignore
except ImportError:
RedBeatScheduler = None
def _get_headers(task: "Task") -> "dict[str, Any]":
headers = task.request.get("headers") or {}
# flatten nested headers
if "headers" in headers:
headers.update(headers["headers"])
del headers["headers"]
headers.update(task.request.get("properties") or {})
return headers
def _get_monitor_config(
celery_schedule: "Any", app: "Celery", monitor_name: str
) -> "MonitorConfig":
monitor_config: "MonitorConfig" = {}
schedule_type: "Optional[MonitorConfigScheduleType]" = None
schedule_value: "Optional[Union[str, int]]" = None
schedule_unit: "Optional[MonitorConfigScheduleUnit]" = None
if isinstance(celery_schedule, crontab):
schedule_type = "crontab"
schedule_value = (
"{0._orig_minute} "
"{0._orig_hour} "
"{0._orig_day_of_month} "
"{0._orig_month_of_year} "
"{0._orig_day_of_week}".format(celery_schedule)
)
elif isinstance(celery_schedule, schedule):
schedule_type = "interval"
(schedule_value, schedule_unit) = _get_humanized_interval(
celery_schedule.seconds
)
if schedule_unit == "second":
logger.warning(
"Intervals shorter than one minute are not supported by Sentry Crons. Monitor '%s' has an interval of %s seconds. Use the `exclude_beat_tasks` option in the celery integration to exclude it.",
monitor_name,
schedule_value,
)
return {}
else:
logger.warning(
"Celery schedule type '%s' not supported by Sentry Crons.",
type(celery_schedule),
)
return {}
monitor_config["schedule"] = {}
monitor_config["schedule"]["type"] = schedule_type
monitor_config["schedule"]["value"] = schedule_value
if schedule_unit is not None:
monitor_config["schedule"]["unit"] = schedule_unit
monitor_config["timezone"] = (
(
hasattr(celery_schedule, "tz")
and celery_schedule.tz is not None
and str(celery_schedule.tz)
)
or app.timezone
or "UTC"
)
return monitor_config
def _apply_crons_data_to_schedule_entry(
scheduler: "Any",
schedule_entry: "Any",
integration: "sentry_sdk.integrations.celery.CeleryIntegration",
) -> None:
"""
Add Sentry Crons information to the schedule_entry headers.
"""
if not integration.monitor_beat_tasks:
return
monitor_name = schedule_entry.name
task_should_be_excluded = match_regex_list(
monitor_name, integration.exclude_beat_tasks
)
if task_should_be_excluded:
return
celery_schedule = schedule_entry.schedule
app = scheduler.app
monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)
is_supported_schedule = bool(monitor_config)
if not is_supported_schedule:
return
headers = schedule_entry.options.pop("headers", {})
headers.update(
{
"sentry-monitor-slug": monitor_name,
"sentry-monitor-config": monitor_config,
}
)
check_in_id = capture_checkin(
monitor_slug=monitor_name,
monitor_config=monitor_config,
status=MonitorStatus.IN_PROGRESS,
)
headers.update({"sentry-monitor-check-in-id": check_in_id})
# Set the Sentry configuration in the options of the ScheduleEntry.
# Those will be picked up in `apply_async` and added to the headers.
schedule_entry.options["headers"] = headers
def _wrap_beat_scheduler(
original_function: "Callable[..., Any]",
) -> "Callable[..., Any]":
"""
Makes sure that:
- a new Sentry trace is started for each task started by Celery Beat and
it is propagated to the task.
- the Sentry Crons information is set in the Celery Beat task's
headers so that is monitored with Sentry Crons.
After the patched function is called,
Celery Beat will call apply_async to put the task in the queue.
"""
# Patch only once
# Can't use __name__ here, because some of our tests mock original_apply_entry
already_patched = "sentry_patched_scheduler" in str(original_function)
if already_patched:
return original_function
from sentry_sdk.integrations.celery import CeleryIntegration
def sentry_patched_scheduler(*args: "Any", **kwargs: "Any") -> None:
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
if integration is None:
return original_function(*args, **kwargs)
# Tasks started by Celery Beat start a new Trace
scope = sentry_sdk.get_isolation_scope()
scope.set_new_propagation_context()
scope._name = "celery-beat"
scheduler, schedule_entry = args
_apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration)
return original_function(*args, **kwargs)
return sentry_patched_scheduler
def _patch_beat_apply_entry() -> None:
Scheduler.apply_entry = _wrap_beat_scheduler(Scheduler.apply_entry)
def _patch_redbeat_apply_async() -> None:
if RedBeatScheduler is None:
return
RedBeatScheduler.apply_async = _wrap_beat_scheduler(RedBeatScheduler.apply_async)
def _setup_celery_beat_signals(monitor_beat_tasks: bool) -> None:
if monitor_beat_tasks:
task_success.connect(crons_task_success)
task_failure.connect(crons_task_failure)
task_retry.connect(crons_task_retry)
def crons_task_success(sender: "Task", **kwargs: "dict[Any, Any]") -> None:
logger.debug("celery_task_success %s", sender)
headers = _get_headers(sender)
if "sentry-monitor-slug" not in headers:
return
monitor_config = headers.get("sentry-monitor-config", {})
start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")
capture_checkin(
monitor_slug=headers["sentry-monitor-slug"],
monitor_config=monitor_config,
check_in_id=headers["sentry-monitor-check-in-id"],
duration=(
_now_seconds_since_epoch() - float(start_timestamp_s)
if start_timestamp_s
else None
),
status=MonitorStatus.OK,
)
def crons_task_failure(sender: "Task", **kwargs: "dict[Any, Any]") -> None:
logger.debug("celery_task_failure %s", sender)
headers = _get_headers(sender)
if "sentry-monitor-slug" not in headers:
return
monitor_config = headers.get("sentry-monitor-config", {})
start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")
capture_checkin(
monitor_slug=headers["sentry-monitor-slug"],
monitor_config=monitor_config,
check_in_id=headers["sentry-monitor-check-in-id"],
duration=(
_now_seconds_since_epoch() - float(start_timestamp_s)
if start_timestamp_s
else None
),
status=MonitorStatus.ERROR,
)
def crons_task_retry(sender: "Task", **kwargs: "dict[Any, Any]") -> None:
logger.debug("celery_task_retry %s", sender)
headers = _get_headers(sender)
if "sentry-monitor-slug" not in headers:
return
monitor_config = headers.get("sentry-monitor-config", {})
start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")
capture_checkin(
monitor_slug=headers["sentry-monitor-slug"],
monitor_config=monitor_config,
check_in_id=headers["sentry-monitor-check-in-id"],
duration=(
_now_seconds_since_epoch() - float(start_timestamp_s)
if start_timestamp_s
else None
),
status=MonitorStatus.ERROR,
)
Directory Contents
Dirs: 1 × Files: 3