PHP 8.3.31
Preview: aiomysql.py Size: 9.09 KB
/proc/thread-self/root/opt/hc_python/lib/python3.12/site-packages/sentry_sdk/integrations/aiomysql.py

# -*- coding: utf-8 -*-
from __future__ import annotations

from typing import Any, Awaitable, Callable, TypeVar

import sentry_sdk
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version
from sentry_sdk.traces import StreamedSpan
from sentry_sdk.tracing_utils import (
    add_query_source,
    has_span_streaming_enabled,
    record_sql_queries,
)
from sentry_sdk.utils import (
    capture_internal_exceptions,
    parse_version,
)

try:
    import aiomysql  # type: ignore[import-not-found]
    from aiomysql.connection import Connection  # type: ignore[import-not-found]
    from aiomysql.cursors import Cursor  # type: ignore[import-not-found]
except ImportError:
    raise DidNotEnable("aiomysql not installed.")


class AioMySQLIntegration(Integration):
    identifier = "aiomysql"
    origin = f"auto.db.{identifier}"
    _record_params = False

    def __init__(self, *, record_params: bool = False):
        AioMySQLIntegration._record_params = record_params

    @staticmethod
    def setup_once() -> None:
        aiomysql_version = parse_version(aiomysql.__version__)
        _check_minimum_version(AioMySQLIntegration, aiomysql_version)

        Cursor.execute = _wrap_execute(Cursor.execute)
        Cursor.executemany = _wrap_executemany(Cursor.executemany)

        # Patch Connection._connect — this catches ALL connections:
        #   - aiomysql.connect()
        #   - aiomysql.create_pool() (pool.py does `from .connection import connect`
        #     which ultimately calls Connection._connect)
        #   - Reconnects
        Connection._connect = _wrap_connect(Connection._connect)


T = TypeVar("T")


def _normalize_query(query: str | bytes | bytearray) -> str:
    if isinstance(query, (bytes, bytearray)):
        query = query.decode("utf-8", errors="replace")
    return " ".join(query.split())


def _wrap_execute(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
    """Wrap Cursor.execute to capture SQL queries."""

    async def _inner(*args: Any, **kwargs: Any) -> T:
        if sentry_sdk.get_client().get_integration(AioMySQLIntegration) is None:
            return await f(*args, **kwargs)

        cursor = args[0]

        # Skip if flagged by executemany (avoids double-recording).
        # Do NOT reset the flag here — it must stay True for the entire
        # duration of executemany, which may call execute multiple times
        # in a loop (non-INSERT fallback). Only _wrap_executemany's
        # finally block should clear it.
        if getattr(cursor, "_sentry_skip_next_execute", False):
            return await f(*args, **kwargs)

        query = args[1] if len(args) > 1 else kwargs.get("query", "")
        query_str = _normalize_query(query)
        params = args[2] if len(args) > 2 else kwargs.get("args")

        conn = _get_connection(cursor)

        integration = sentry_sdk.get_client().get_integration(AioMySQLIntegration)
        params_list = params if integration and integration._record_params else None
        param_style = "pyformat" if params_list else None

        with record_sql_queries(
            cursor=None,
            query=query_str,
            params_list=params_list,
            paramstyle=param_style,
            executemany=False,
            span_origin=AioMySQLIntegration.origin,
        ) as span:
            if conn:
                _set_db_data(span, conn)
            res = await f(*args, **kwargs)
            if isinstance(span, StreamedSpan):
                with capture_internal_exceptions():
                    add_query_source(span)

        if not isinstance(span, StreamedSpan):
            with capture_internal_exceptions():
                add_query_source(span)

        return res

    return _inner


def _wrap_executemany(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
    """Wrap Cursor.executemany to capture SQL queries."""

    async def _inner(*args: Any, **kwargs: Any) -> T:
        if sentry_sdk.get_client().get_integration(AioMySQLIntegration) is None:
            return await f(*args, **kwargs)

        cursor = args[0]
        query = args[1] if len(args) > 1 else kwargs.get("query", "")
        query_str = _normalize_query(query)
        seq_of_params = args[2] if len(args) > 2 else kwargs.get("args")

        conn = _get_connection(cursor)

        integration = sentry_sdk.get_client().get_integration(AioMySQLIntegration)
        params_list = (
            seq_of_params if integration and integration._record_params else None
        )
        param_style = "pyformat" if params_list else None

        # Prevent double-recording: _do_execute_many calls self.execute internally
        cursor._sentry_skip_next_execute = True
        try:
            with record_sql_queries(
                cursor=None,
                query=query_str,
                params_list=params_list,
                paramstyle=param_style,
                executemany=True,
                span_origin=AioMySQLIntegration.origin,
            ) as span:
                if conn:
                    _set_db_data(span, conn)
                res = await f(*args, **kwargs)
                if isinstance(span, StreamedSpan):
                    with capture_internal_exceptions():
                        add_query_source(span)

            if not isinstance(span, StreamedSpan):
                with capture_internal_exceptions():
                    add_query_source(span)

            return res
        finally:
            cursor._sentry_skip_next_execute = False

    return _inner


def _get_connection(cursor: Any) -> Any:
    """Get the underlying connection from a cursor."""
    return getattr(cursor, "connection", None)


def _wrap_connect(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
    """Wrap Connection._connect to capture connection spans."""

    async def _inner(self: "Connection") -> T:
        client = sentry_sdk.get_client()
        if client.get_integration(AioMySQLIntegration) is None:
            return await f(self)

        if has_span_streaming_enabled(client.options):
            breadcrumb_data = _get_connect_data(self, use_streaming_keys=True)

            span_attributes: dict[str, Any] = {
                "sentry.op": OP.DB,
                "sentry.origin": AioMySQLIntegration.origin,
            } | breadcrumb_data

            with sentry_sdk.traces.start_span(
                name="connect", attributes=span_attributes
            ) as span:
                with capture_internal_exceptions():
                    sentry_sdk.add_breadcrumb(
                        message="connect", category="query", data=breadcrumb_data
                    )
                res = await f(self)
        else:
            connect_data = _get_connect_data(self)

            with sentry_sdk.start_span(
                op=OP.DB,
                name="connect",
                origin=AioMySQLIntegration.origin,
            ) as span:
                _set_db_data(span, self)

                with capture_internal_exceptions():
                    sentry_sdk.add_breadcrumb(
                        message="connect",
                        category="query",
                        data=connect_data,
                    )
                res = await f(self)

        return res

    return _inner


def _get_connect_data(conn: Any, *, use_streaming_keys: bool = False) -> dict[str, Any]:
    if use_streaming_keys:
        db_system = SPANDATA.DB_SYSTEM_NAME
        db_name = SPANDATA.DB_NAMESPACE
    else:
        db_system = SPANDATA.DB_SYSTEM
        db_name = SPANDATA.DB_NAME

    data: dict[str, Any] = {
        db_system: "mysql",
        SPANDATA.DB_DRIVER_NAME: "aiomysql",
    }

    host = getattr(conn, "host", None)
    if host is not None:
        data[SPANDATA.SERVER_ADDRESS] = host

    port = getattr(conn, "port", None)
    if port is not None:
        data[SPANDATA.SERVER_PORT] = port

    database = getattr(conn, "db", None)
    if database is not None:
        data[db_name] = database

    user = getattr(conn, "user", None)
    if user is not None:
        data[SPANDATA.DB_USER] = user

    return data


def _set_db_data(span: Any, conn: Any) -> None:
    """Set database-related span data from connection object."""
    if isinstance(span, StreamedSpan):
        set_value = span.set_attribute
        db_system = SPANDATA.DB_SYSTEM_NAME
        db_name = SPANDATA.DB_NAMESPACE
    else:
        # Remove this else block once we've completely migrated to streamed spans
        # The use of deprecated attributes here is to ensure backwards compatibility
        set_value = span.set_data
        db_system = SPANDATA.DB_SYSTEM
        db_name = SPANDATA.DB_NAME

    set_value(db_system, "mysql")
    set_value(SPANDATA.DB_DRIVER_NAME, "aiomysql")

    host = getattr(conn, "host", None)
    if host is not None:
        set_value(SPANDATA.SERVER_ADDRESS, host)

    port = getattr(conn, "port", None)
    if port is not None:
        set_value(SPANDATA.SERVER_PORT, port)

    database = getattr(conn, "db", None)
    if database is not None:
        set_value(db_name, database)

    user = getattr(conn, "user", None)
    if user is not None:
        set_value(SPANDATA.DB_USER, user)

Directory Contents

Dirs: 10 × Files: 73

Name Size Perms Modified Actions
celery DIR
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
django DIR
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
grpc DIR
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
redis DIR
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
spark DIR
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
19.28 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
9.09 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
39.00 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
876 B lrw-r--r-- 2026-06-11 06:30:30
Edit Download
5.70 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
9.23 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
20.06 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
9.28 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
9.68 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
1.51 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
17.41 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
4.91 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
6.20 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
7.21 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
4.51 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
5.85 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
7.49 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
10.44 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
1.86 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
8.02 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
2.25 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
1.93 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
9.04 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
5.28 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
8.27 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
10.57 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
2.72 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
4.93 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
5.71 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
9.79 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
9.80 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
8.19 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
15.28 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
48.31 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
18.13 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
1.87 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
13.03 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
11.46 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
15.69 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
6.35 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
23.12 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
787 B lrw-r--r-- 2026-06-11 06:30:30
Edit Download
53.38 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
1.08 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
7.99 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
4.41 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
8.21 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
7.42 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
6.82 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
7.32 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
5.75 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
7.81 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
9.44 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
15.25 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
1.58 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
5.02 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
5.24 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
27.93 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
11.04 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
1.19 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
14.01 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
17.39 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
2.35 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
6.88 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
10.79 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
1.67 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
1.72 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
1.02 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
1.65 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
15.03 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
4.00 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
7.28 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
12.51 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download

If ZipArchive is unavailable, a .tar will be created (no compression).