REDROOM
PHP 8.3.31
Path:
Logout
Edit File
Size: 8.63 KB
Close
/opt/hc_python/lib/python3.12/site-packages/sentry_sdk/integrations/celery/beat.py
Text
Base64
from typing import TYPE_CHECKING import sentry_sdk from sentry_sdk.crons import MonitorStatus, capture_checkin from sentry_sdk.integrations import DidNotEnable from sentry_sdk.integrations.celery.utils import ( _get_humanized_interval, _now_seconds_since_epoch, ) from sentry_sdk.utils import ( logger, match_regex_list, ) if TYPE_CHECKING: from collections.abc import Callable from typing import Any, Optional, TypeVar, Union from sentry_sdk._types import ( MonitorConfig, MonitorConfigScheduleType, MonitorConfigScheduleUnit, ) F = TypeVar("F", bound=Callable[..., Any]) try: from celery import Celery, Task # type: ignore from celery.beat import Scheduler # type: ignore from celery.schedules import crontab, schedule # type: ignore from celery.signals import ( # type: ignore task_failure, task_retry, task_success, ) except ImportError: raise DidNotEnable("Celery not installed") try: from redbeat.schedulers import RedBeatScheduler # type: ignore except ImportError: RedBeatScheduler = None def _get_headers(task: "Task") -> "dict[str, Any]": headers = task.request.get("headers") or {} # flatten nested headers if "headers" in headers: headers.update(headers["headers"]) del headers["headers"] headers.update(task.request.get("properties") or {}) return headers def _get_monitor_config( celery_schedule: "Any", app: "Celery", monitor_name: str ) -> "MonitorConfig": monitor_config: "MonitorConfig" = {} schedule_type: "Optional[MonitorConfigScheduleType]" = None schedule_value: "Optional[Union[str, int]]" = None schedule_unit: "Optional[MonitorConfigScheduleUnit]" = None if isinstance(celery_schedule, crontab): schedule_type = "crontab" schedule_value = ( "{0._orig_minute} " "{0._orig_hour} " "{0._orig_day_of_month} " "{0._orig_month_of_year} " "{0._orig_day_of_week}".format(celery_schedule) ) elif isinstance(celery_schedule, schedule): schedule_type = "interval" (schedule_value, schedule_unit) = _get_humanized_interval( celery_schedule.seconds ) if schedule_unit == "second": logger.warning( "Intervals shorter than one minute are not supported by Sentry Crons. Monitor '%s' has an interval of %s seconds. Use the `exclude_beat_tasks` option in the celery integration to exclude it.", monitor_name, schedule_value, ) return {} else: logger.warning( "Celery schedule type '%s' not supported by Sentry Crons.", type(celery_schedule), ) return {} monitor_config["schedule"] = {} monitor_config["schedule"]["type"] = schedule_type monitor_config["schedule"]["value"] = schedule_value if schedule_unit is not None: monitor_config["schedule"]["unit"] = schedule_unit monitor_config["timezone"] = ( ( hasattr(celery_schedule, "tz") and celery_schedule.tz is not None and str(celery_schedule.tz) ) or app.timezone or "UTC" ) return monitor_config def _apply_crons_data_to_schedule_entry( scheduler: "Any", schedule_entry: "Any", integration: "sentry_sdk.integrations.celery.CeleryIntegration", ) -> None: """ Add Sentry Crons information to the schedule_entry headers. """ if not integration.monitor_beat_tasks: return monitor_name = schedule_entry.name task_should_be_excluded = match_regex_list( monitor_name, integration.exclude_beat_tasks ) if task_should_be_excluded: return celery_schedule = schedule_entry.schedule app = scheduler.app monitor_config = _get_monitor_config(celery_schedule, app, monitor_name) is_supported_schedule = bool(monitor_config) if not is_supported_schedule: return headers = schedule_entry.options.pop("headers", {}) headers.update( { "sentry-monitor-slug": monitor_name, "sentry-monitor-config": monitor_config, } ) check_in_id = capture_checkin( monitor_slug=monitor_name, monitor_config=monitor_config, status=MonitorStatus.IN_PROGRESS, ) headers.update({"sentry-monitor-check-in-id": check_in_id}) # Set the Sentry configuration in the options of the ScheduleEntry. # Those will be picked up in `apply_async` and added to the headers. schedule_entry.options["headers"] = headers def _wrap_beat_scheduler( original_function: "Callable[..., Any]", ) -> "Callable[..., Any]": """ Makes sure that: - a new Sentry trace is started for each task started by Celery Beat and it is propagated to the task. - the Sentry Crons information is set in the Celery Beat task's headers so that is monitored with Sentry Crons. After the patched function is called, Celery Beat will call apply_async to put the task in the queue. """ # Patch only once # Can't use __name__ here, because some of our tests mock original_apply_entry already_patched = "sentry_patched_scheduler" in str(original_function) if already_patched: return original_function from sentry_sdk.integrations.celery import CeleryIntegration def sentry_patched_scheduler(*args: "Any", **kwargs: "Any") -> None: integration = sentry_sdk.get_client().get_integration(CeleryIntegration) if integration is None: return original_function(*args, **kwargs) # Tasks started by Celery Beat start a new Trace scope = sentry_sdk.get_isolation_scope() scope.set_new_propagation_context() scope._name = "celery-beat" scheduler, schedule_entry = args _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration) return original_function(*args, **kwargs) return sentry_patched_scheduler def _patch_beat_apply_entry() -> None: Scheduler.apply_entry = _wrap_beat_scheduler(Scheduler.apply_entry) def _patch_redbeat_apply_async() -> None: if RedBeatScheduler is None: return RedBeatScheduler.apply_async = _wrap_beat_scheduler(RedBeatScheduler.apply_async) def _setup_celery_beat_signals(monitor_beat_tasks: bool) -> None: if monitor_beat_tasks: task_success.connect(crons_task_success) task_failure.connect(crons_task_failure) task_retry.connect(crons_task_retry) def crons_task_success(sender: "Task", **kwargs: "dict[Any, Any]") -> None: logger.debug("celery_task_success %s", sender) headers = _get_headers(sender) if "sentry-monitor-slug" not in headers: return monitor_config = headers.get("sentry-monitor-config", {}) start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s") capture_checkin( monitor_slug=headers["sentry-monitor-slug"], monitor_config=monitor_config, check_in_id=headers["sentry-monitor-check-in-id"], duration=( _now_seconds_since_epoch() - float(start_timestamp_s) if start_timestamp_s else None ), status=MonitorStatus.OK, ) def crons_task_failure(sender: "Task", **kwargs: "dict[Any, Any]") -> None: logger.debug("celery_task_failure %s", sender) headers = _get_headers(sender) if "sentry-monitor-slug" not in headers: return monitor_config = headers.get("sentry-monitor-config", {}) start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s") capture_checkin( monitor_slug=headers["sentry-monitor-slug"], monitor_config=monitor_config, check_in_id=headers["sentry-monitor-check-in-id"], duration=( _now_seconds_since_epoch() - float(start_timestamp_s) if start_timestamp_s else None ), status=MonitorStatus.ERROR, ) def crons_task_retry(sender: "Task", **kwargs: "dict[Any, Any]") -> None: logger.debug("celery_task_retry %s", sender) headers = _get_headers(sender) if "sentry-monitor-slug" not in headers: return monitor_config = headers.get("sentry-monitor-config", {}) start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s") capture_checkin( monitor_slug=headers["sentry-monitor-slug"], monitor_config=monitor_config, check_in_id=headers["sentry-monitor-check-in-id"], duration=( _now_seconds_since_epoch() - float(start_timestamp_s) if start_timestamp_s else None ), status=MonitorStatus.ERROR, )
Save
Close
Exit & Reset
Text mode: syntax highlighting auto-detects file type.
Directory Contents
Dirs: 1 × Files: 3
Delete Selected
Select All
Select None
Sort:
Name
Size
Modified
Enable drag-to-move
Name
Size
Perms
Modified
Actions
__pycache__
DIR
-
drwxr-xr-x
2026-06-11 06:30:31
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
beat.py
8.63 KB
lrw-r--r--
2026-06-11 06:30:31
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
utils.py
972 B
lrw-r--r--
2026-06-11 06:30:31
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
__init__.py
21.96 KB
lrw-r--r--
2026-06-11 06:30:31
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
Zip Selected
If ZipArchive is unavailable, a
.tar
will be created (no compression).