Preview: arq.py
Size: 9.23 KB
//opt/hc_python/lib64/python3.12/site-packages/sentry_sdk/integrations/arq.py
import sys
import sentry_sdk
from sentry_sdk.consts import OP, SPANDATA, SPANSTATUS
from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.scope import should_send_default_pii
from sentry_sdk.traces import SegmentSource
from sentry_sdk.tracing import Transaction, TransactionSource
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.utils import (
SENSITIVE_DATA_SUBSTITUTE,
_register_control_flow_exception,
capture_internal_exceptions,
ensure_integration_enabled,
event_from_exception,
parse_version,
reraise,
)
try:
import arq.worker
from arq.connections import ArqRedis
from arq.version import VERSION as ARQ_VERSION
from arq.worker import JobExecutionFailed, Retry, RetryJob, Worker
except ImportError:
raise DidNotEnable("Arq is not installed")
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any, Dict, Optional, Union
from arq.cron import CronJob
from arq.jobs import Job
from arq.typing import WorkerCoroutine
from arq.worker import Function
from sentry_sdk._types import Event, EventProcessor, ExcInfo, Hint
ARQ_CONTROL_FLOW_EXCEPTIONS = (JobExecutionFailed, Retry, RetryJob)
class ArqIntegration(Integration):
identifier = "arq"
origin = f"auto.queue.{identifier}"
@staticmethod
def setup_once() -> None:
try:
if isinstance(ARQ_VERSION, str):
version = parse_version(ARQ_VERSION)
else:
version = ARQ_VERSION.version[:2]
except (TypeError, ValueError):
version = None
_check_minimum_version(ArqIntegration, version)
patch_enqueue_job()
patch_run_job()
patch_create_worker()
_register_control_flow_exception(ARQ_CONTROL_FLOW_EXCEPTIONS) # type: ignore
ignore_logger("arq.worker")
def patch_enqueue_job() -> None:
old_enqueue_job = ArqRedis.enqueue_job
original_kwdefaults = old_enqueue_job.__kwdefaults__
async def _sentry_enqueue_job(
self: "ArqRedis", function: str, *args: "Any", **kwargs: "Any"
) -> "Optional[Job]":
client = sentry_sdk.get_client()
if client.get_integration(ArqIntegration) is None:
return await old_enqueue_job(self, function, *args, **kwargs)
if has_span_streaming_enabled(client.options):
with sentry_sdk.traces.start_span(
name=function,
attributes={
"sentry.op": OP.QUEUE_SUBMIT_ARQ,
"sentry.origin": ArqIntegration.origin,
},
):
return await old_enqueue_job(self, function, *args, **kwargs)
with sentry_sdk.start_span(
op=OP.QUEUE_SUBMIT_ARQ, name=function, origin=ArqIntegration.origin
):
return await old_enqueue_job(self, function, *args, **kwargs)
_sentry_enqueue_job.__kwdefaults__ = original_kwdefaults
ArqRedis.enqueue_job = _sentry_enqueue_job
def patch_run_job() -> None:
old_run_job = Worker.run_job
async def _sentry_run_job(self: "Worker", job_id: str, score: int) -> None:
client = sentry_sdk.get_client()
if client.get_integration(ArqIntegration) is None:
return await old_run_job(self, job_id, score)
with sentry_sdk.isolation_scope() as scope:
scope._name = "arq"
scope.clear_breadcrumbs()
if has_span_streaming_enabled(client.options):
with sentry_sdk.traces.start_span(
name="unknown arq task",
attributes={
"sentry.op": OP.QUEUE_TASK_ARQ,
"sentry.origin": ArqIntegration.origin,
"sentry.span.source": SegmentSource.TASK,
SPANDATA.MESSAGING_MESSAGE_ID: job_id,
},
parent_span=None,
):
return await old_run_job(self, job_id, score)
transaction = Transaction(
name="unknown arq task",
status="ok",
op=OP.QUEUE_TASK_ARQ,
source=TransactionSource.TASK,
origin=ArqIntegration.origin,
)
with sentry_sdk.start_transaction(transaction):
return await old_run_job(self, job_id, score)
Worker.run_job = _sentry_run_job
def _capture_exception(exc_info: "ExcInfo") -> None:
scope = sentry_sdk.get_current_scope()
if scope.transaction is not None:
if exc_info[0] in ARQ_CONTROL_FLOW_EXCEPTIONS:
scope.transaction.set_status(SPANSTATUS.ABORTED)
return
scope.transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
if exc_info[0] in ARQ_CONTROL_FLOW_EXCEPTIONS:
return
event, hint = event_from_exception(
exc_info,
client_options=sentry_sdk.get_client().options,
mechanism={"type": ArqIntegration.identifier, "handled": False},
)
sentry_sdk.capture_event(event, hint=hint)
def _make_event_processor(
ctx: "Dict[Any, Any]", *args: "Any", **kwargs: "Any"
) -> "EventProcessor":
def event_processor(event: "Event", hint: "Hint") -> "Optional[Event]":
with capture_internal_exceptions():
scope = sentry_sdk.get_current_scope()
if scope.transaction is not None:
scope.transaction.name = ctx["job_name"]
event["transaction"] = ctx["job_name"]
tags = event.setdefault("tags", {})
tags["arq_task_id"] = ctx["job_id"]
tags["arq_task_retry"] = ctx["job_try"] > 1
extra = event.setdefault("extra", {})
extra["arq-job"] = {
"task": ctx["job_name"],
"args": (
args if should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE
),
"kwargs": (
kwargs if should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE
),
"retry": ctx["job_try"],
}
return event
return event_processor
def _wrap_coroutine(name: str, coroutine: "WorkerCoroutine") -> "WorkerCoroutine":
async def _sentry_coroutine(
ctx: "Dict[Any, Any]", *args: "Any", **kwargs: "Any"
) -> "Any":
client = sentry_sdk.get_client()
integration = client.get_integration(ArqIntegration)
if integration is None:
return await coroutine(ctx, *args, **kwargs)
if has_span_streaming_enabled(client.options):
scope = sentry_sdk.get_current_scope()
span = scope.streamed_span
if span is not None:
span.name = name
scope.set_transaction_name(name)
sentry_sdk.get_isolation_scope().add_event_processor(
_make_event_processor({**ctx, "job_name": name}, *args, **kwargs)
)
try:
result = await coroutine(ctx, *args, **kwargs)
except Exception:
exc_info = sys.exc_info()
_capture_exception(exc_info)
reraise(*exc_info)
return result
return _sentry_coroutine
def patch_create_worker() -> None:
old_create_worker = arq.worker.create_worker
@ensure_integration_enabled(ArqIntegration, old_create_worker)
def _sentry_create_worker(*args: "Any", **kwargs: "Any") -> "Worker":
settings_cls = args[0] if args else kwargs.get("settings_cls")
if isinstance(settings_cls, dict):
if "functions" in settings_cls:
settings_cls["functions"] = [
_get_arq_function(func)
for func in settings_cls.get("functions", [])
]
if "cron_jobs" in settings_cls:
settings_cls["cron_jobs"] = [
_get_arq_cron_job(cron_job)
for cron_job in settings_cls.get("cron_jobs", [])
]
if hasattr(settings_cls, "functions"):
settings_cls.functions = [ # type: ignore[union-attr]
_get_arq_function(func)
for func in settings_cls.functions # type: ignore[union-attr]
]
if hasattr(settings_cls, "cron_jobs"):
settings_cls.cron_jobs = [ # type: ignore[union-attr]
_get_arq_cron_job(cron_job)
for cron_job in (settings_cls.cron_jobs or []) # type: ignore[union-attr]
]
if "functions" in kwargs:
kwargs["functions"] = [
_get_arq_function(func) for func in kwargs.get("functions", [])
]
if "cron_jobs" in kwargs:
kwargs["cron_jobs"] = [
_get_arq_cron_job(cron_job) for cron_job in kwargs.get("cron_jobs", [])
]
return old_create_worker(*args, **kwargs)
arq.worker.create_worker = _sentry_create_worker
def _get_arq_function(func: "Union[str, Function, WorkerCoroutine]") -> "Function":
arq_func = arq.worker.func(func)
arq_func.coroutine = _wrap_coroutine(arq_func.name, arq_func.coroutine)
return arq_func
def _get_arq_cron_job(cron_job: "CronJob") -> "CronJob":
cron_job.coroutine = _wrap_coroutine(cron_job.name, cron_job.coroutine)
return cron_job
Directory Contents
Dirs: 10 × Files: 73