Preview: starlette.py
Size: 27.93 KB
/proc/thread-self/root/opt/hc_python/lib/python3.12/site-packages/sentry_sdk/integrations/starlette.py
import functools
import json
import sys
import warnings
from collections.abc import Set
from copy import deepcopy
from json import JSONDecodeError
from typing import TYPE_CHECKING
import sentry_sdk
from sentry_sdk.consts import OP
from sentry_sdk.integrations import (
_DEFAULT_FAILED_REQUEST_STATUS_CODES,
DidNotEnable,
Integration,
)
from sentry_sdk.integrations._wsgi_common import (
DEFAULT_HTTP_METHODS_TO_CAPTURE,
HttpCodeRangeContainer,
_is_json_content_type,
request_body_within_bounds,
)
from sentry_sdk.integrations.asgi import SentryAsgiMiddleware
from sentry_sdk.scope import should_send_default_pii
from sentry_sdk.traces import StreamedSpan, get_current_span
from sentry_sdk.tracing import (
SOURCE_FOR_STYLE,
TransactionSource,
)
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.utils import (
AnnotatedValue,
capture_internal_exceptions,
ensure_integration_enabled,
event_from_exception,
parse_version,
transaction_from_function,
)
if TYPE_CHECKING:
from typing import Any, Awaitable, Callable, Container, Dict, Optional, Tuple, Union
from sentry_sdk._types import Event, HttpStatusCodeRange
try:
import starlette # type: ignore
from starlette import __version__ as STARLETTE_VERSION
from starlette.applications import Starlette # type: ignore
from starlette.datastructures import UploadFile # type: ignore
from starlette.middleware import Middleware # type: ignore
from starlette.middleware.authentication import ( # type: ignore
AuthenticationMiddleware,
)
from starlette.requests import Request # type: ignore
from starlette.routing import Match # type: ignore
from starlette.types import ASGIApp, Receive, Send # type: ignore
from starlette.types import Scope as StarletteScope
except ImportError:
raise DidNotEnable("Starlette is not installed")
try:
# Starlette 0.20
from starlette.middleware.exceptions import ExceptionMiddleware # type: ignore
except ImportError:
# Startlette 0.19.1
from starlette.exceptions import ExceptionMiddleware # type: ignore
try:
# Optional dependency of Starlette to parse form data.
try:
# python-multipart 0.0.13 and later
import python_multipart as multipart # type: ignore
except ImportError:
# python-multipart 0.0.12 and earlier
import multipart # type: ignore
except ImportError:
multipart = None
# Vendored: https://github.com/Kludex/starlette/blob/0a29b5ccdcbd1285c75c4fdb5d62ae1d244a21b0/starlette/_utils.py#L11-L17
if sys.version_info >= (3, 13): # pragma: no cover
from inspect import iscoroutinefunction
else:
from asyncio import iscoroutinefunction
_DEFAULT_TRANSACTION_NAME = "generic Starlette request"
TRANSACTION_STYLE_VALUES = ("endpoint", "url")
class StarletteIntegration(Integration):
identifier = "starlette"
origin = f"auto.http.{identifier}"
transaction_style = ""
def __init__(
self,
transaction_style: str = "url",
failed_request_status_codes: "Union[Set[int], list[HttpStatusCodeRange], None]" = _DEFAULT_FAILED_REQUEST_STATUS_CODES,
middleware_spans: bool = False,
http_methods_to_capture: "tuple[str, ...]" = DEFAULT_HTTP_METHODS_TO_CAPTURE,
):
if transaction_style not in TRANSACTION_STYLE_VALUES:
raise ValueError(
"Invalid value for transaction_style: %s (must be in %s)"
% (transaction_style, TRANSACTION_STYLE_VALUES)
)
self.transaction_style = transaction_style
self.middleware_spans = middleware_spans
self.http_methods_to_capture = tuple(map(str.upper, http_methods_to_capture))
if isinstance(failed_request_status_codes, Set):
self.failed_request_status_codes: "Container[int]" = (
failed_request_status_codes
)
else:
warnings.warn(
"Passing a list or None for failed_request_status_codes is deprecated. "
"Please pass a set of int instead.",
DeprecationWarning,
stacklevel=2,
)
if failed_request_status_codes is None:
self.failed_request_status_codes = _DEFAULT_FAILED_REQUEST_STATUS_CODES
else:
self.failed_request_status_codes = HttpCodeRangeContainer(
failed_request_status_codes
)
@staticmethod
def setup_once() -> None:
version = parse_version(STARLETTE_VERSION)
if version is None:
raise DidNotEnable(
"Unparsable Starlette version: {}".format(STARLETTE_VERSION)
)
patch_middlewares()
patch_asgi_app()
patch_request_response()
if version >= (0, 24):
patch_templates()
def _enable_span_for_middleware(middleware_class: "Any") -> type:
old_call = middleware_class.__call__
async def _create_span_call(
app: "Any",
scope: "Dict[str, Any]",
receive: "Callable[[], Awaitable[Dict[str, Any]]]",
send: "Callable[[Dict[str, Any]], Awaitable[None]]",
**kwargs: "Any",
) -> None:
client = sentry_sdk.get_client()
integration = client.get_integration(StarletteIntegration)
if integration is None:
return await old_call(app, scope, receive, send, **kwargs)
# Update transaction name with middleware name
name, source = _get_transaction_from_middleware(app, scope, integration)
if name is not None:
sentry_sdk.get_current_scope().set_transaction_name(
name,
source=source,
)
if not integration.middleware_spans:
return await old_call(app, scope, receive, send, **kwargs)
middleware_name = app.__class__.__name__
is_span_streaming_enabled = has_span_streaming_enabled(client.options)
def _start_middleware_span(op: str, name: str) -> "Any":
if is_span_streaming_enabled:
return sentry_sdk.traces.start_span(
name=name,
attributes={
"sentry.op": op,
"sentry.origin": StarletteIntegration.origin,
"middleware.name": middleware_name,
},
)
return sentry_sdk.start_span(
op=op,
name=name,
origin=StarletteIntegration.origin,
)
with _start_middleware_span(
op=OP.MIDDLEWARE_STARLETTE, name=middleware_name
) as middleware_span:
if not is_span_streaming_enabled:
middleware_span.set_tag("starlette.middleware_name", middleware_name)
# Creating spans for the "receive" callback
async def _sentry_receive(*args: "Any", **kwargs: "Any") -> "Any":
with _start_middleware_span(
op=OP.MIDDLEWARE_STARLETTE_RECEIVE,
name=getattr(receive, "__qualname__", str(receive)),
) as span:
if not is_span_streaming_enabled:
span.set_tag("starlette.middleware_name", middleware_name)
return await receive(*args, **kwargs)
receive_name = getattr(receive, "__name__", str(receive))
receive_patched = receive_name == "_sentry_receive"
new_receive = _sentry_receive if not receive_patched else receive
# Creating spans for the "send" callback
async def _sentry_send(*args: "Any", **kwargs: "Any") -> "Any":
with _start_middleware_span(
op=OP.MIDDLEWARE_STARLETTE_SEND,
name=getattr(send, "__qualname__", str(send)),
) as span:
if not is_span_streaming_enabled:
span.set_tag("starlette.middleware_name", middleware_name)
return await send(*args, **kwargs)
send_name = getattr(send, "__name__", str(send))
send_patched = send_name == "_sentry_send"
new_send = _sentry_send if not send_patched else send
return await old_call(app, scope, new_receive, new_send, **kwargs)
not_yet_patched = old_call.__name__ not in [
"_create_span_call",
"_sentry_authenticationmiddleware_call",
"_sentry_exceptionmiddleware_call",
]
if not_yet_patched:
middleware_class.__call__ = _create_span_call
return middleware_class
def _serialize_request_body_data(data: "Any") -> str:
# data may be a JSON-serializable value, an AnnotatedValue, or a dict with AnnotatedValue values
def _default(value: "Any") -> "Any":
if isinstance(value, AnnotatedValue):
return value.value
return str(value)
return json.dumps(data, default=_default)
def _set_request_body_data_on_streaming_segment(
info: "Optional[Dict[str, Any]]",
) -> None:
current_span = get_current_span()
if info and "data" in info and type(current_span) is StreamedSpan:
with capture_internal_exceptions():
current_span._segment.set_attribute(
"http.request.body.data",
_serialize_request_body_data(info["data"]),
)
@ensure_integration_enabled(StarletteIntegration)
def _capture_exception(exception: BaseException, handled: "Any" = False) -> None:
event, hint = event_from_exception(
exception,
client_options=sentry_sdk.get_client().options,
mechanism={"type": StarletteIntegration.identifier, "handled": handled},
)
sentry_sdk.capture_event(event, hint=hint)
def patch_exception_middleware(middleware_class: "Any") -> None:
"""
Capture all exceptions in Starlette app and
also extract user information.
"""
old_middleware_init = middleware_class.__init__
not_yet_patched = "_sentry_middleware_init" not in str(old_middleware_init)
if not_yet_patched:
def _sentry_middleware_init(self: "Any", *args: "Any", **kwargs: "Any") -> None:
old_middleware_init(self, *args, **kwargs)
# Patch existing exception handlers
old_handlers = self._exception_handlers.copy()
async def _sentry_patched_exception_handler(
self: "Any", *args: "Any", **kwargs: "Any"
) -> None:
integration = sentry_sdk.get_client().get_integration(
StarletteIntegration
)
exp = args[0]
if integration is not None:
is_http_server_error = (
hasattr(exp, "status_code")
and isinstance(exp.status_code, int)
and exp.status_code in integration.failed_request_status_codes
)
if is_http_server_error:
_capture_exception(exp, handled=True)
# Find a matching handler
old_handler = None
for cls in type(exp).__mro__:
if cls in old_handlers:
old_handler = old_handlers[cls]
break
if old_handler is None:
return
if _is_async_callable(old_handler):
return await old_handler(self, *args, **kwargs)
else:
return old_handler(self, *args, **kwargs)
for key in self._exception_handlers.keys():
self._exception_handlers[key] = _sentry_patched_exception_handler
middleware_class.__init__ = _sentry_middleware_init
old_call = middleware_class.__call__
async def _sentry_exceptionmiddleware_call(
self: "Dict[str, Any]",
scope: "Dict[str, Any]",
receive: "Callable[[], Awaitable[Dict[str, Any]]]",
send: "Callable[[Dict[str, Any]], Awaitable[None]]",
) -> None:
# Also add the user (that was eventually set by be Authentication middle
# that was called before this middleware). This is done because the authentication
# middleware sets the user in the scope and then (in the same function)
# calls this exception middelware. In case there is no exception (or no handler
# for the type of exception occuring) then the exception bubbles up and setting the
# user information into the sentry scope is done in auth middleware and the
# ASGI middleware will then send everything to Sentry and this is fine.
# But if there is an exception happening that the exception middleware here
# has a handler for, it will send the exception directly to Sentry, so we need
# the user information right now.
# This is why we do it here.
_add_user_to_sentry_scope(scope)
await old_call(self, scope, receive, send)
middleware_class.__call__ = _sentry_exceptionmiddleware_call
@ensure_integration_enabled(StarletteIntegration)
def _add_user_to_sentry_scope(scope: "Dict[str, Any]") -> None:
"""
Extracts user information from the ASGI scope and
adds it to Sentry's scope.
"""
if "user" not in scope:
return
if not should_send_default_pii():
return
user_info: "Dict[str, Any]" = {}
starlette_user = scope["user"]
username = getattr(starlette_user, "username", None)
if username:
user_info.setdefault("username", starlette_user.username)
user_id = getattr(starlette_user, "id", None)
if user_id:
user_info.setdefault("id", starlette_user.id)
email = getattr(starlette_user, "email", None)
if email:
user_info.setdefault("email", starlette_user.email)
sentry_scope = sentry_sdk.get_isolation_scope()
sentry_scope.set_user(user_info)
def patch_authentication_middleware(middleware_class: "Any") -> None:
"""
Add user information to Sentry scope.
"""
old_call = middleware_class.__call__
not_yet_patched = "_sentry_authenticationmiddleware_call" not in str(old_call)
if not_yet_patched:
async def _sentry_authenticationmiddleware_call(
self: "Dict[str, Any]",
scope: "Dict[str, Any]",
receive: "Callable[[], Awaitable[Dict[str, Any]]]",
send: "Callable[[Dict[str, Any]], Awaitable[None]]",
) -> None:
await old_call(self, scope, receive, send)
_add_user_to_sentry_scope(scope)
middleware_class.__call__ = _sentry_authenticationmiddleware_call
def patch_middlewares() -> None:
"""
Patches Starlettes `Middleware` class to record
spans for every middleware invoked.
"""
old_middleware_init = Middleware.__init__
not_yet_patched = "_sentry_middleware_init" not in str(old_middleware_init)
if not_yet_patched:
def _sentry_middleware_init(
self: "Any", cls: "Any", *args: "Any", **kwargs: "Any"
) -> None:
if cls == SentryAsgiMiddleware:
return old_middleware_init(self, cls, *args, **kwargs)
span_enabled_cls = _enable_span_for_middleware(cls)
old_middleware_init(self, span_enabled_cls, *args, **kwargs)
if cls == AuthenticationMiddleware:
patch_authentication_middleware(cls)
if cls == ExceptionMiddleware:
patch_exception_middleware(cls)
Middleware.__init__ = _sentry_middleware_init
def patch_asgi_app() -> None:
"""
Instrument Starlette ASGI app using the SentryAsgiMiddleware.
"""
old_app = Starlette.__call__
async def _sentry_patched_asgi_app(
self: "Starlette", scope: "StarletteScope", receive: "Receive", send: "Send"
) -> None:
integration = sentry_sdk.get_client().get_integration(StarletteIntegration)
if integration is None:
return await old_app(self, scope, receive, send)
middleware = SentryAsgiMiddleware(
lambda *a, **kw: old_app(self, *a, **kw),
mechanism_type=StarletteIntegration.identifier,
transaction_style=integration.transaction_style,
span_origin=StarletteIntegration.origin,
http_methods_to_capture=(
integration.http_methods_to_capture
if integration
else DEFAULT_HTTP_METHODS_TO_CAPTURE
),
asgi_version=3,
)
return await middleware(scope, receive, send)
Starlette.__call__ = _sentry_patched_asgi_app
# This was vendored in from Starlette to support Starlette 0.19.1 because
# this function was only introduced in 0.20.x
def _is_async_callable(obj: "Any") -> bool:
while isinstance(obj, functools.partial):
obj = obj.func
return iscoroutinefunction(obj) or (
callable(obj) and iscoroutinefunction(obj.__call__) # type: ignore[operator]
)
def patch_request_response() -> None:
old_request_response = starlette.routing.request_response
def _sentry_request_response(func: "Callable[[Any], Any]") -> "ASGIApp":
old_func = func
is_coroutine = _is_async_callable(old_func)
if is_coroutine:
async def _sentry_async_func(*args: "Any", **kwargs: "Any") -> "Any":
client = sentry_sdk.get_client()
integration = client.get_integration(StarletteIntegration)
if integration is None:
return await old_func(*args, **kwargs)
request = args[0]
_set_transaction_name_and_source(
sentry_sdk.get_current_scope(),
integration.transaction_style,
request,
)
sentry_scope = sentry_sdk.get_isolation_scope()
extractor = StarletteRequestExtractor(request)
info = await extractor.extract_request_info()
def _make_request_event_processor(
req: "Any", integration: "Any"
) -> "Callable[[Event, dict[str, Any]], Event]":
def event_processor(
event: "Event", hint: "Dict[str, Any]"
) -> "Event":
# Add info from request to event
request_info = event.get("request", {})
if info:
if "cookies" in info:
request_info["cookies"] = info["cookies"]
if "data" in info:
request_info["data"] = info["data"]
event["request"] = deepcopy(request_info)
return event
return event_processor
sentry_scope._name = StarletteIntegration.identifier
sentry_scope.add_event_processor(
_make_request_event_processor(request, integration)
)
if has_span_streaming_enabled(client.options):
_set_request_body_data_on_streaming_segment(info)
return await old_func(*args, **kwargs)
func = _sentry_async_func
else:
@functools.wraps(old_func)
def _sentry_sync_func(*args: "Any", **kwargs: "Any") -> "Any":
client = sentry_sdk.get_client()
integration = client.get_integration(StarletteIntegration)
if integration is None:
return old_func(*args, **kwargs)
current_scope = sentry_sdk.get_current_scope()
span_streaming = has_span_streaming_enabled(client.options)
if span_streaming:
current_span = current_scope.streamed_span
if type(current_span) is StreamedSpan:
current_span._segment._update_active_thread()
elif current_scope.transaction is not None:
current_scope.transaction.update_active_thread()
sentry_scope = sentry_sdk.get_isolation_scope()
if sentry_scope.profile is not None:
sentry_scope.profile.update_active_thread_id()
request = args[0]
_set_transaction_name_and_source(
sentry_scope, integration.transaction_style, request
)
extractor = StarletteRequestExtractor(request)
cookies = extractor.extract_cookies_from_request()
def _make_request_event_processor(
req: "Any", integration: "Any"
) -> "Callable[[Event, dict[str, Any]], Event]":
def event_processor(
event: "Event", hint: "dict[str, Any]"
) -> "Event":
# Extract information from request
request_info = event.get("request", {})
if cookies:
request_info["cookies"] = cookies
event["request"] = deepcopy(request_info)
return event
return event_processor
sentry_scope._name = StarletteIntegration.identifier
sentry_scope.add_event_processor(
_make_request_event_processor(request, integration)
)
return old_func(*args, **kwargs)
func = _sentry_sync_func
return old_request_response(func)
starlette.routing.request_response = _sentry_request_response
def patch_templates() -> None:
# If markupsafe is not installed, then Jinja2 is not installed
# (markupsafe is a dependency of Jinja2)
# In this case we do not need to patch the Jinja2Templates class
try:
from markupsafe import Markup
except ImportError:
return # Nothing to do
# https://github.com/Kludex/starlette/commit/96479daca2e4bd8157f68d914fd162aa94eff73a
try:
from starlette.templating import Jinja2Templates # type: ignore
except ImportError:
return
old_jinja2templates_init = Jinja2Templates.__init__
not_yet_patched = "_sentry_jinja2templates_init" not in str(
old_jinja2templates_init
)
if not_yet_patched:
def _sentry_jinja2templates_init(
self: "Jinja2Templates", *args: "Any", **kwargs: "Any"
) -> None:
def add_sentry_trace_meta(request: "Request") -> "Dict[str, Any]":
trace_meta = Markup(
sentry_sdk.get_current_scope().trace_propagation_meta()
)
return {
"sentry_trace_meta": trace_meta,
}
kwargs.setdefault("context_processors", [])
if add_sentry_trace_meta not in kwargs["context_processors"]:
kwargs["context_processors"].append(add_sentry_trace_meta)
return old_jinja2templates_init(self, *args, **kwargs)
Jinja2Templates.__init__ = _sentry_jinja2templates_init
class StarletteRequestExtractor:
"""
Extracts useful information from the Starlette request
(like form data or cookies) and adds it to the Sentry event.
"""
request: "Request" = None
def __init__(self: "StarletteRequestExtractor", request: "Request") -> None:
self.request = request
def extract_cookies_from_request(
self: "StarletteRequestExtractor",
) -> "Optional[Dict[str, Any]]":
cookies: "Optional[Dict[str, Any]]" = None
if should_send_default_pii():
cookies = self.cookies()
return cookies
async def extract_request_info(
self: "StarletteRequestExtractor",
) -> "Optional[Dict[str, Any]]":
client = sentry_sdk.get_client()
request_info: "Dict[str, Any]" = {}
with capture_internal_exceptions():
# Add cookies
if should_send_default_pii():
request_info["cookies"] = self.cookies()
# If there is no body, just return the cookies
content_length = await self.content_length()
if not content_length:
return request_info
# Add annotation if body is too big
if content_length and not request_body_within_bounds(
client, content_length
):
request_info["data"] = AnnotatedValue.removed_because_over_size_limit()
return request_info
# Add JSON body, if it is a JSON request
json = await self.json()
if json:
request_info["data"] = json
return request_info
# Add form as key/value pairs, if request has form data
form = await self.form()
if form:
form_data = {}
for key, val in form.items():
is_file = isinstance(val, UploadFile)
form_data[key] = (
val
if not is_file
else AnnotatedValue.removed_because_raw_data()
)
request_info["data"] = form_data
return request_info
# Raw data, do not add body just an annotation
request_info["data"] = AnnotatedValue.removed_because_raw_data()
return request_info
async def content_length(self: "StarletteRequestExtractor") -> "Optional[int]":
if "content-length" in self.request.headers:
return int(self.request.headers["content-length"])
return None
def cookies(self: "StarletteRequestExtractor") -> "Dict[str, Any]":
return self.request.cookies
async def form(self: "StarletteRequestExtractor") -> "Any":
if multipart is None:
return None
# Parse the body first to get it cached, as Starlette does not cache form() as it
# does with body() and json() https://github.com/encode/starlette/discussions/1933
# Calling `.form()` without calling `.body()` first will
# potentially break the users project.
await self.request.body()
return await self.request.form()
def is_json(self: "StarletteRequestExtractor") -> bool:
return _is_json_content_type(self.request.headers.get("content-type"))
async def json(self: "StarletteRequestExtractor") -> "Optional[Dict[str, Any]]":
if not self.is_json():
return None
try:
return await self.request.json()
except JSONDecodeError:
return None
def _transaction_name_from_router(scope: "StarletteScope") -> "Optional[str]":
router = scope.get("router")
if not router:
return None
for route in router.routes:
match = route.matches(scope)
if match[0] == Match.FULL:
try:
return route.path
except AttributeError:
# routes added via app.host() won't have a path attribute
return scope.get("path")
return None
def _set_transaction_name_and_source(
scope: "sentry_sdk.Scope", transaction_style: str, request: "Any"
) -> None:
name = None
source = SOURCE_FOR_STYLE[transaction_style]
if transaction_style == "endpoint":
endpoint = request.scope.get("endpoint")
if endpoint:
name = transaction_from_function(endpoint) or None
elif transaction_style == "url":
name = _transaction_name_from_router(request.scope)
if name is None:
name = _DEFAULT_TRANSACTION_NAME
source = TransactionSource.ROUTE
scope.set_transaction_name(name, source=source)
def _get_transaction_from_middleware(
app: "Any", asgi_scope: "Dict[str, Any]", integration: "StarletteIntegration"
) -> "Tuple[Optional[str], Optional[str]]":
name = None
source = None
if integration.transaction_style == "endpoint":
name = transaction_from_function(app.__class__)
source = TransactionSource.COMPONENT
elif integration.transaction_style == "url":
name = _transaction_name_from_router(asgi_scope)
source = TransactionSource.ROUTE
return name, source
Directory Contents
Dirs: 10 × Files: 73