PHP 8.3.31
Preview: beat.py Size: 8.63 KB
//opt/hc_python/lib64/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py

from typing import TYPE_CHECKING

import sentry_sdk
from sentry_sdk.crons import MonitorStatus, capture_checkin
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.integrations.celery.utils import (
    _get_humanized_interval,
    _now_seconds_since_epoch,
)
from sentry_sdk.utils import (
    logger,
    match_regex_list,
)

if TYPE_CHECKING:
    from collections.abc import Callable
    from typing import Any, Optional, TypeVar, Union

    from sentry_sdk._types import (
        MonitorConfig,
        MonitorConfigScheduleType,
        MonitorConfigScheduleUnit,
    )

    F = TypeVar("F", bound=Callable[..., Any])


try:
    from celery import Celery, Task  # type: ignore
    from celery.beat import Scheduler  # type: ignore
    from celery.schedules import crontab, schedule  # type: ignore
    from celery.signals import (  # type: ignore
        task_failure,
        task_retry,
        task_success,
    )
except ImportError:
    raise DidNotEnable("Celery not installed")

try:
    from redbeat.schedulers import RedBeatScheduler  # type: ignore
except ImportError:
    RedBeatScheduler = None


def _get_headers(task: "Task") -> "dict[str, Any]":
    headers = task.request.get("headers") or {}

    # flatten nested headers
    if "headers" in headers:
        headers.update(headers["headers"])
        del headers["headers"]

    headers.update(task.request.get("properties") or {})

    return headers


def _get_monitor_config(
    celery_schedule: "Any", app: "Celery", monitor_name: str
) -> "MonitorConfig":
    monitor_config: "MonitorConfig" = {}
    schedule_type: "Optional[MonitorConfigScheduleType]" = None
    schedule_value: "Optional[Union[str, int]]" = None
    schedule_unit: "Optional[MonitorConfigScheduleUnit]" = None

    if isinstance(celery_schedule, crontab):
        schedule_type = "crontab"
        schedule_value = (
            "{0._orig_minute} "
            "{0._orig_hour} "
            "{0._orig_day_of_month} "
            "{0._orig_month_of_year} "
            "{0._orig_day_of_week}".format(celery_schedule)
        )
    elif isinstance(celery_schedule, schedule):
        schedule_type = "interval"
        (schedule_value, schedule_unit) = _get_humanized_interval(
            celery_schedule.seconds
        )

        if schedule_unit == "second":
            logger.warning(
                "Intervals shorter than one minute are not supported by Sentry Crons. Monitor '%s' has an interval of %s seconds. Use the `exclude_beat_tasks` option in the celery integration to exclude it.",
                monitor_name,
                schedule_value,
            )
            return {}

    else:
        logger.warning(
            "Celery schedule type '%s' not supported by Sentry Crons.",
            type(celery_schedule),
        )
        return {}

    monitor_config["schedule"] = {}
    monitor_config["schedule"]["type"] = schedule_type
    monitor_config["schedule"]["value"] = schedule_value

    if schedule_unit is not None:
        monitor_config["schedule"]["unit"] = schedule_unit

    monitor_config["timezone"] = (
        (
            hasattr(celery_schedule, "tz")
            and celery_schedule.tz is not None
            and str(celery_schedule.tz)
        )
        or app.timezone
        or "UTC"
    )

    return monitor_config


def _apply_crons_data_to_schedule_entry(
    scheduler: "Any",
    schedule_entry: "Any",
    integration: "sentry_sdk.integrations.celery.CeleryIntegration",
) -> None:
    """
    Add Sentry Crons information to the schedule_entry headers.
    """
    if not integration.monitor_beat_tasks:
        return

    monitor_name = schedule_entry.name

    task_should_be_excluded = match_regex_list(
        monitor_name, integration.exclude_beat_tasks
    )
    if task_should_be_excluded:
        return

    celery_schedule = schedule_entry.schedule
    app = scheduler.app

    monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)

    is_supported_schedule = bool(monitor_config)
    if not is_supported_schedule:
        return

    headers = schedule_entry.options.pop("headers", {})
    headers.update(
        {
            "sentry-monitor-slug": monitor_name,
            "sentry-monitor-config": monitor_config,
        }
    )

    check_in_id = capture_checkin(
        monitor_slug=monitor_name,
        monitor_config=monitor_config,
        status=MonitorStatus.IN_PROGRESS,
    )
    headers.update({"sentry-monitor-check-in-id": check_in_id})

    # Set the Sentry configuration in the options of the ScheduleEntry.
    # Those will be picked up in `apply_async` and added to the headers.
    schedule_entry.options["headers"] = headers


def _wrap_beat_scheduler(
    original_function: "Callable[..., Any]",
) -> "Callable[..., Any]":
    """
    Makes sure that:
    - a new Sentry trace is started for each task started by Celery Beat and
      it is propagated to the task.
    - the Sentry Crons information is set in the Celery Beat task's
      headers so that is monitored with Sentry Crons.

    After the patched function is called,
    Celery Beat will call apply_async to put the task in the queue.
    """
    # Patch only once
    # Can't use __name__ here, because some of our tests mock original_apply_entry
    already_patched = "sentry_patched_scheduler" in str(original_function)
    if already_patched:
        return original_function

    from sentry_sdk.integrations.celery import CeleryIntegration

    def sentry_patched_scheduler(*args: "Any", **kwargs: "Any") -> None:
        integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
        if integration is None:
            return original_function(*args, **kwargs)

        # Tasks started by Celery Beat start a new Trace
        scope = sentry_sdk.get_isolation_scope()
        scope.set_new_propagation_context()
        scope._name = "celery-beat"

        scheduler, schedule_entry = args
        _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration)

        return original_function(*args, **kwargs)

    return sentry_patched_scheduler


def _patch_beat_apply_entry() -> None:
    Scheduler.apply_entry = _wrap_beat_scheduler(Scheduler.apply_entry)


def _patch_redbeat_apply_async() -> None:
    if RedBeatScheduler is None:
        return

    RedBeatScheduler.apply_async = _wrap_beat_scheduler(RedBeatScheduler.apply_async)


def _setup_celery_beat_signals(monitor_beat_tasks: bool) -> None:
    if monitor_beat_tasks:
        task_success.connect(crons_task_success)
        task_failure.connect(crons_task_failure)
        task_retry.connect(crons_task_retry)


def crons_task_success(sender: "Task", **kwargs: "dict[Any, Any]") -> None:
    logger.debug("celery_task_success %s", sender)
    headers = _get_headers(sender)

    if "sentry-monitor-slug" not in headers:
        return

    monitor_config = headers.get("sentry-monitor-config", {})

    start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")

    capture_checkin(
        monitor_slug=headers["sentry-monitor-slug"],
        monitor_config=monitor_config,
        check_in_id=headers["sentry-monitor-check-in-id"],
        duration=(
            _now_seconds_since_epoch() - float(start_timestamp_s)
            if start_timestamp_s
            else None
        ),
        status=MonitorStatus.OK,
    )


def crons_task_failure(sender: "Task", **kwargs: "dict[Any, Any]") -> None:
    logger.debug("celery_task_failure %s", sender)
    headers = _get_headers(sender)

    if "sentry-monitor-slug" not in headers:
        return

    monitor_config = headers.get("sentry-monitor-config", {})

    start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")

    capture_checkin(
        monitor_slug=headers["sentry-monitor-slug"],
        monitor_config=monitor_config,
        check_in_id=headers["sentry-monitor-check-in-id"],
        duration=(
            _now_seconds_since_epoch() - float(start_timestamp_s)
            if start_timestamp_s
            else None
        ),
        status=MonitorStatus.ERROR,
    )


def crons_task_retry(sender: "Task", **kwargs: "dict[Any, Any]") -> None:
    logger.debug("celery_task_retry %s", sender)
    headers = _get_headers(sender)

    if "sentry-monitor-slug" not in headers:
        return

    monitor_config = headers.get("sentry-monitor-config", {})

    start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")

    capture_checkin(
        monitor_slug=headers["sentry-monitor-slug"],
        monitor_config=monitor_config,
        check_in_id=headers["sentry-monitor-check-in-id"],
        duration=(
            _now_seconds_since_epoch() - float(start_timestamp_s)
            if start_timestamp_s
            else None
        ),
        status=MonitorStatus.ERROR,
    )

Directory Contents

Dirs: 1 × Files: 3

Name Size Perms Modified Actions
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
8.63 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
972 B lrw-r--r-- 2026-06-11 06:30:31
Edit Download
21.96 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download

If ZipArchive is unavailable, a .tar will be created (no compression).