PHP 8.3.31
Preview: langchain.py Size: 48.31 KB
/proc/thread-self/root/opt/hc_python/lib/python3.12/site-packages/sentry_sdk/integrations/langchain.py

import itertools
import json
import sys
import warnings
from collections import OrderedDict
from functools import wraps
from typing import TYPE_CHECKING

import sentry_sdk
from sentry_sdk.ai.utils import (
    GEN_AI_ALLOWED_MESSAGE_ROLES,
    get_start_span_function,
    normalize_message_roles,
    set_data_normalized,
    transform_content_part,
    truncate_and_annotate_messages,
)
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations import DidNotEnable, Integration
from sentry_sdk.scope import should_send_default_pii
from sentry_sdk.traces import StreamedSpan
from sentry_sdk.tracing_utils import (
    _get_value,
    has_span_streaming_enabled,
    should_truncate_gen_ai_input,
)
from sentry_sdk.utils import capture_internal_exceptions, logger

if TYPE_CHECKING:
    from typing import (
        Any,
        AsyncIterator,
        Callable,
        Dict,
        Iterator,
        List,
        Optional,
        Union,
    )
    from uuid import UUID

    from sentry_sdk._types import TextPart
    from sentry_sdk.tracing import Span


try:
    from langchain_core.agents import AgentFinish
    from langchain_core.callbacks import (
        BaseCallbackHandler,
        BaseCallbackManager,
        Callbacks,
        manager,
    )
    from langchain_core.messages import BaseMessage
    from langchain_core.outputs import LLMResult

except ImportError:
    raise DidNotEnable("langchain not installed")


try:
    # >=v1
    from langchain_classic.agents import AgentExecutor  # type: ignore[import-not-found]
except ImportError:
    try:
        # <v1
        from langchain.agents import AgentExecutor
    # Catch TypeError due to changes in type hint evaluation order: https://github.com/pydantic/pydantic/issues/13036
    except (ImportError, TypeError):
        AgentExecutor = None


# Conditional imports for embeddings providers
try:
    from langchain_openai import OpenAIEmbeddings  # type: ignore[import-not-found]
except ImportError:
    OpenAIEmbeddings = None

try:
    from langchain_openai import AzureOpenAIEmbeddings
except ImportError:
    AzureOpenAIEmbeddings = None

try:
    from langchain_google_vertexai import (  # type: ignore[import-not-found]
        VertexAIEmbeddings,
    )
except ImportError:
    VertexAIEmbeddings = None

try:
    from langchain_aws import BedrockEmbeddings  # type: ignore[import-not-found]
except ImportError:
    BedrockEmbeddings = None

try:
    from langchain_cohere import CohereEmbeddings  # type: ignore[import-not-found]
except ImportError:
    CohereEmbeddings = None

try:
    from langchain_mistralai import (  # type: ignore[import-not-found]
        MistralAIEmbeddings,
    )
except ImportError:
    MistralAIEmbeddings = None

try:
    from langchain_huggingface import (  # type: ignore[import-not-found]
        HuggingFaceEmbeddings,
    )
except ImportError:
    HuggingFaceEmbeddings = None

try:
    from langchain_ollama import OllamaEmbeddings  # type: ignore[import-not-found]
except ImportError:
    OllamaEmbeddings = None


def _get_ai_system(all_params: "Dict[str, Any]") -> "Optional[str]":
    ai_type = all_params.get("_type")

    if not ai_type or not isinstance(ai_type, str):
        return None

    return ai_type


DATA_FIELDS = {
    "frequency_penalty": SPANDATA.GEN_AI_REQUEST_FREQUENCY_PENALTY,
    "function_call": SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS,
    "max_tokens": SPANDATA.GEN_AI_REQUEST_MAX_TOKENS,
    "presence_penalty": SPANDATA.GEN_AI_REQUEST_PRESENCE_PENALTY,
    "temperature": SPANDATA.GEN_AI_REQUEST_TEMPERATURE,
    "tool_calls": SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS,
    "top_k": SPANDATA.GEN_AI_REQUEST_TOP_K,
    "top_p": SPANDATA.GEN_AI_REQUEST_TOP_P,
}


def _transform_langchain_content_block(
    content_block: "Dict[str, Any]",
) -> "Dict[str, Any]":
    """
    Transform a LangChain content block using the shared transform_content_part function.

    Returns the original content block if transformation is not applicable
    (e.g., for text blocks or unrecognized formats).
    """
    result = transform_content_part(content_block)
    return result if result is not None else content_block


def _transform_langchain_message_content(content: "Any") -> "Any":
    """
    Transform LangChain message content, handling both string content and
    list of content blocks.
    """
    if isinstance(content, str):
        return content

    if isinstance(content, (list, tuple)):
        transformed = []
        for block in content:
            if isinstance(block, dict):
                transformed.append(_transform_langchain_content_block(block))
            else:
                transformed.append(block)
        return transformed

    return content


def _get_system_instructions(messages: "List[List[BaseMessage]]") -> "List[str]":
    system_instructions = []

    for list_ in messages:
        for message in list_:
            # type of content: str | list[str | dict] | None
            if message.type == "system" and isinstance(message.content, str):
                system_instructions.append(message.content)

            elif message.type == "system" and isinstance(message.content, list):
                for item in message.content:
                    if isinstance(item, str):
                        system_instructions.append(item)

                    elif isinstance(item, dict) and item.get("type") == "text":
                        instruction = item.get("text")
                        if isinstance(instruction, str):
                            system_instructions.append(instruction)

    return system_instructions


def _transform_system_instructions(
    system_instructions: "List[str]",
) -> "List[TextPart]":
    return [
        {
            "type": "text",
            "content": instruction,
        }
        for instruction in system_instructions
    ]


class LangchainIntegration(Integration):
    identifier = "langchain"
    origin = f"auto.ai.{identifier}"

    def __init__(
        self: "LangchainIntegration",
        include_prompts: bool = True,
        max_spans: "Optional[int]" = None,
    ) -> None:
        self.include_prompts = include_prompts
        self.max_spans = max_spans

        if max_spans is not None:
            warnings.warn(
                "The `max_spans` parameter of `LangchainIntegration` is "
                "deprecated and will be removed in version 3.0 of sentry-sdk.",
                DeprecationWarning,
                stacklevel=2,
            )

    @staticmethod
    def setup_once() -> None:
        manager._configure = _wrap_configure(manager._configure)

        if AgentExecutor is not None:
            AgentExecutor.invoke = _wrap_agent_executor_invoke(AgentExecutor.invoke)
            AgentExecutor.stream = _wrap_agent_executor_stream(AgentExecutor.stream)

        # Patch embeddings providers
        _patch_embeddings_provider(OpenAIEmbeddings)
        _patch_embeddings_provider(AzureOpenAIEmbeddings)
        _patch_embeddings_provider(VertexAIEmbeddings)
        _patch_embeddings_provider(BedrockEmbeddings)
        _patch_embeddings_provider(CohereEmbeddings)
        _patch_embeddings_provider(MistralAIEmbeddings)
        _patch_embeddings_provider(HuggingFaceEmbeddings)
        _patch_embeddings_provider(OllamaEmbeddings)


class SentryLangchainCallback(BaseCallbackHandler):  # type: ignore[misc]
    """Callback handler that creates Sentry spans."""

    def __init__(
        self, max_span_map_size: "Optional[int]", include_prompts: bool
    ) -> None:
        self.span_map: "OrderedDict[UUID, Union[sentry_sdk.tracing.Span, StreamedSpan]]" = OrderedDict()
        self.max_span_map_size = max_span_map_size
        self.include_prompts = include_prompts

    def gc_span_map(self) -> None:
        if self.max_span_map_size is not None:
            while len(self.span_map) > self.max_span_map_size:
                run_id, span = self.span_map.popitem(last=False)
                self._exit_span(span, run_id)

    def _handle_error(self, run_id: "UUID", error: "Any") -> None:
        with capture_internal_exceptions():
            if not run_id or run_id not in self.span_map:
                return

            span = self.span_map[run_id]

            sentry_sdk.capture_exception(
                error, span._scope if isinstance(span, StreamedSpan) else span.scope
            )

            span.__exit__(type(error), error, error.__traceback__)
            del self.span_map[run_id]

    def _normalize_langchain_message(self, message: "BaseMessage") -> "Any":
        # Transform content to handle multimodal data (images, audio, video, files)
        transformed_content = _transform_langchain_message_content(message.content)
        parsed = {"role": message.type, "content": transformed_content}
        parsed.update(message.additional_kwargs)
        return parsed

    def _create_span(
        self: "SentryLangchainCallback",
        run_id: "UUID",
        parent_id: "Optional[Any]",
        op: str,
        name: str,
        origin: str,
    ) -> "Union[sentry_sdk.tracing.Span, StreamedSpan]":
        span = None
        if parent_id:
            parent_span: "Optional[Union[sentry_sdk.tracing.Span, StreamedSpan]]" = (
                self.span_map.get(parent_id)
            )
            if parent_span:
                span = (
                    sentry_sdk.traces.start_span(
                        parent_span=parent_span,
                        name=name,
                        attributes={
                            "sentry.op": op,
                            "sentry.origin": origin,
                        },
                    )
                    if isinstance(parent_span, StreamedSpan)
                    else parent_span.start_child(op=op, name=name, origin=origin)
                )

        if span is None:
            span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options)
            span = (
                sentry_sdk.traces.start_span(
                    name=name,
                    attributes={
                        "sentry.op": op,
                        "sentry.origin": origin,
                    },
                )
                if span_streaming
                else sentry_sdk.start_span(op=op, name=name, origin=origin)
            )

        span.__enter__()
        self.span_map[run_id] = span
        self.gc_span_map()
        return span

    def _exit_span(
        self: "SentryLangchainCallback",
        span: "Union[sentry_sdk.tracing.Span, StreamedSpan]",
        run_id: "UUID",
    ) -> None:
        span.__exit__(None, None, None)
        del self.span_map[run_id]

    def on_llm_start(
        self: "SentryLangchainCallback",
        serialized: "Dict[str, Any]",
        prompts: "List[str]",
        *,
        run_id: "UUID",
        tags: "Optional[List[str]]" = None,
        parent_run_id: "Optional[UUID]" = None,
        metadata: "Optional[Dict[str, Any]]" = None,
        **kwargs: "Any",
    ) -> "Any":
        with capture_internal_exceptions():
            if not run_id:
                return

            all_params = kwargs.get("invocation_params", {})
            all_params.update(serialized.get("kwargs", {}))

            model = (
                all_params.get("model")
                or all_params.get("model_name")
                or all_params.get("model_id")
                or ""
            )

            span = self._create_span(
                run_id,
                parent_run_id,
                op=OP.GEN_AI_TEXT_COMPLETION,
                name=f"text_completion {model}".strip(),
                origin=LangchainIntegration.origin,
            )

            set_on_span = (
                span.set_attribute if isinstance(span, StreamedSpan) else span.set_data
            )
            set_on_span(SPANDATA.GEN_AI_OPERATION_NAME, "text_completion")

            run_name = kwargs.get("name")
            if run_name:
                set_on_span(SPANDATA.GEN_AI_FUNCTION_ID, run_name)

            if model:
                set_on_span(
                    SPANDATA.GEN_AI_REQUEST_MODEL,
                    model,
                )

            ai_system = _get_ai_system(all_params)
            if ai_system:
                set_on_span(SPANDATA.GEN_AI_SYSTEM, ai_system)

            for key, attribute in DATA_FIELDS.items():
                if key in all_params and all_params[key] is not None:
                    set_data_normalized(span, attribute, all_params[key], unpack=False)

            _set_tools_on_span(span, all_params.get("tools"))

            if should_send_default_pii() and self.include_prompts:
                normalized_messages = [
                    {
                        "role": GEN_AI_ALLOWED_MESSAGE_ROLES.USER,
                        "content": {"type": "text", "text": prompt},
                    }
                    for prompt in prompts
                ]

                client = sentry_sdk.get_client()
                scope = sentry_sdk.get_current_scope()
                messages_data = (
                    truncate_and_annotate_messages(normalized_messages, span, scope)
                    if should_truncate_gen_ai_input(client.options)
                    else normalized_messages
                )
                if messages_data is not None:
                    set_data_normalized(
                        span,
                        SPANDATA.GEN_AI_REQUEST_MESSAGES,
                        messages_data,
                        unpack=False,
                    )

    def on_chat_model_start(
        self: "SentryLangchainCallback",
        serialized: "Dict[str, Any]",
        messages: "List[List[BaseMessage]]",
        *,
        run_id: "UUID",
        **kwargs: "Any",
    ) -> "Any":
        """Run when Chat Model starts running."""
        with capture_internal_exceptions():
            if not run_id:
                return

            all_params = kwargs.get("invocation_params", {})
            all_params.update(serialized.get("kwargs", {}))

            model = (
                all_params.get("model")
                or all_params.get("model_name")
                or all_params.get("model_id")
                or ""
            )

            span = self._create_span(
                run_id,
                kwargs.get("parent_run_id"),
                op=OP.GEN_AI_CHAT,
                name=f"chat {model}".strip(),
                origin=LangchainIntegration.origin,
            )

            set_on_span = (
                span.set_attribute if isinstance(span, StreamedSpan) else span.set_data
            )
            set_on_span(SPANDATA.GEN_AI_OPERATION_NAME, "chat")
            if model:
                set_on_span(SPANDATA.GEN_AI_REQUEST_MODEL, model)

            ai_system = _get_ai_system(all_params)
            if ai_system:
                set_on_span(SPANDATA.GEN_AI_SYSTEM, ai_system)

            agent_metadata = kwargs.get("metadata")
            if isinstance(agent_metadata, dict) and "lc_agent_name" in agent_metadata:
                set_on_span(SPANDATA.GEN_AI_AGENT_NAME, agent_metadata["lc_agent_name"])

            run_name = kwargs.get("name")
            if run_name:
                set_on_span(
                    SPANDATA.GEN_AI_FUNCTION_ID,
                    run_name,
                )

            for key, attribute in DATA_FIELDS.items():
                if key in all_params and all_params[key] is not None:
                    set_data_normalized(span, attribute, all_params[key], unpack=False)

            _set_tools_on_span(span, all_params.get("tools"))

            if should_send_default_pii() and self.include_prompts:
                system_instructions = _get_system_instructions(messages)
                if len(system_instructions) > 0:
                    set_on_span(
                        SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS,
                        json.dumps(_transform_system_instructions(system_instructions)),
                    )

                normalized_messages = []
                for list_ in messages:
                    for message in list_:
                        if message.type == "system":
                            continue

                        normalized_messages.append(
                            self._normalize_langchain_message(message)
                        )
                normalized_messages = normalize_message_roles(normalized_messages)

                client = sentry_sdk.get_client()
                scope = sentry_sdk.get_current_scope()
                messages_data = (
                    truncate_and_annotate_messages(normalized_messages, span, scope)
                    if should_truncate_gen_ai_input(client.options)
                    else normalized_messages
                )
                if messages_data is not None:
                    set_data_normalized(
                        span,
                        SPANDATA.GEN_AI_REQUEST_MESSAGES,
                        messages_data,
                        unpack=False,
                    )

    def on_chat_model_end(
        self: "SentryLangchainCallback",
        response: "LLMResult",
        *,
        run_id: "UUID",
        **kwargs: "Any",
    ) -> "Any":
        """Run when Chat Model ends running."""
        with capture_internal_exceptions():
            if not run_id or run_id not in self.span_map:
                return

            span = self.span_map[run_id]

            if should_send_default_pii() and self.include_prompts:
                set_data_normalized(
                    span,
                    SPANDATA.GEN_AI_RESPONSE_TEXT,
                    [[x.text for x in list_] for list_ in response.generations],
                )

            _record_token_usage(span, response)
            self._exit_span(span, run_id)

    def on_llm_end(
        self: "SentryLangchainCallback",
        response: "LLMResult",
        *,
        run_id: "UUID",
        **kwargs: "Any",
    ) -> "Any":
        """Run when LLM ends running."""
        with capture_internal_exceptions():
            if not run_id or run_id not in self.span_map:
                return

            span = self.span_map[run_id]

            try:
                generation = response.generations[0][0]
            except IndexError:
                generation = None

            if generation is not None:
                set_on_span = (
                    span.set_attribute
                    if isinstance(span, StreamedSpan)
                    else span.set_data
                )

                try:
                    response_model = generation.message.response_metadata.get(
                        "model_name"
                    )
                    if response_model is not None:
                        set_on_span(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model)
                except AttributeError:
                    pass

                try:
                    finish_reason = generation.generation_info.get("finish_reason")
                    if finish_reason is not None:
                        set_on_span(
                            SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS,
                            [finish_reason],
                        )
                except AttributeError:
                    pass

                try:
                    if should_send_default_pii() and self.include_prompts:
                        tool_calls = getattr(generation.message, "tool_calls", None)
                        if tool_calls is not None and tool_calls != []:
                            set_data_normalized(
                                span,
                                SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS,
                                tool_calls,
                                unpack=False,
                            )
                except AttributeError:
                    pass

            if should_send_default_pii() and self.include_prompts:
                set_data_normalized(
                    span,
                    SPANDATA.GEN_AI_RESPONSE_TEXT,
                    [[x.text for x in list_] for list_ in response.generations],
                )

            _record_token_usage(span, response)
            self._exit_span(span, run_id)

    def on_llm_error(
        self: "SentryLangchainCallback",
        error: "Union[Exception, KeyboardInterrupt]",
        *,
        run_id: "UUID",
        **kwargs: "Any",
    ) -> "Any":
        """Run when LLM errors."""
        self._handle_error(run_id, error)

    def on_chat_model_error(
        self: "SentryLangchainCallback",
        error: "Union[Exception, KeyboardInterrupt]",
        *,
        run_id: "UUID",
        **kwargs: "Any",
    ) -> "Any":
        """Run when Chat Model errors."""
        self._handle_error(run_id, error)

    def on_agent_finish(
        self: "SentryLangchainCallback",
        finish: "AgentFinish",
        *,
        run_id: "UUID",
        **kwargs: "Any",
    ) -> "Any":
        with capture_internal_exceptions():
            if not run_id or run_id not in self.span_map:
                return

            span = self.span_map[run_id]

            if should_send_default_pii() and self.include_prompts:
                set_data_normalized(
                    span, SPANDATA.GEN_AI_RESPONSE_TEXT, finish.return_values.items()
                )

            self._exit_span(span, run_id)

    def on_tool_start(
        self: "SentryLangchainCallback",
        serialized: "Dict[str, Any]",
        input_str: str,
        *,
        run_id: "UUID",
        **kwargs: "Any",
    ) -> "Any":
        """Run when tool starts running."""
        with capture_internal_exceptions():
            if not run_id:
                return

            tool_name = serialized.get("name") or kwargs.get("name") or ""

            span = self._create_span(
                run_id,
                kwargs.get("parent_run_id"),
                op=OP.GEN_AI_EXECUTE_TOOL,
                name=f"execute_tool {tool_name}".strip(),
                origin=LangchainIntegration.origin,
            )

            set_on_span = (
                span.set_attribute if isinstance(span, StreamedSpan) else span.set_data
            )

            set_on_span(SPANDATA.GEN_AI_OPERATION_NAME, "execute_tool")
            set_on_span(SPANDATA.GEN_AI_TOOL_NAME, tool_name)

            tool_description = serialized.get("description")
            if tool_description is not None:
                set_on_span(SPANDATA.GEN_AI_TOOL_DESCRIPTION, tool_description)

            agent_metadata = kwargs.get("metadata")
            if isinstance(agent_metadata, dict) and "lc_agent_name" in agent_metadata:
                set_on_span(SPANDATA.GEN_AI_AGENT_NAME, agent_metadata["lc_agent_name"])

            run_name = kwargs.get("name")
            if run_name:
                set_on_span(
                    SPANDATA.GEN_AI_FUNCTION_ID,
                    run_name,
                )

            if should_send_default_pii() and self.include_prompts:
                set_data_normalized(
                    span,
                    SPANDATA.GEN_AI_TOOL_INPUT,
                    kwargs.get("inputs", [input_str]),
                )

    def on_tool_end(
        self: "SentryLangchainCallback", output: str, *, run_id: "UUID", **kwargs: "Any"
    ) -> "Any":
        """Run when tool ends running."""
        with capture_internal_exceptions():
            if not run_id or run_id not in self.span_map:
                return

            span = self.span_map[run_id]

            if should_send_default_pii() and self.include_prompts:
                set_data_normalized(span, SPANDATA.GEN_AI_TOOL_OUTPUT, output)

            self._exit_span(span, run_id)

    def on_tool_error(
        self,
        error: "SentryLangchainCallback",
        *args: "Union[Exception, KeyboardInterrupt]",
        run_id: "UUID",
        **kwargs: "Any",
    ) -> "Any":
        """Run when tool errors."""
        self._handle_error(run_id, error)


def _extract_tokens(
    token_usage: "Any",
) -> "tuple[Optional[int], Optional[int], Optional[int]]":
    if not token_usage:
        return None, None, None

    input_tokens = _get_value(token_usage, "prompt_tokens") or _get_value(
        token_usage, "input_tokens"
    )
    output_tokens = _get_value(token_usage, "completion_tokens") or _get_value(
        token_usage, "output_tokens"
    )
    total_tokens = _get_value(token_usage, "total_tokens")

    return input_tokens, output_tokens, total_tokens


def _extract_tokens_from_generations(
    generations: "Any",
) -> "tuple[Optional[int], Optional[int], Optional[int]]":
    """Extract token usage from response.generations structure."""
    if not generations:
        return None, None, None

    total_input = 0
    total_output = 0
    total_total = 0

    for gen_list in generations:
        for gen in gen_list:
            token_usage = _get_token_usage(gen)
            input_tokens, output_tokens, total_tokens = _extract_tokens(token_usage)
            total_input += input_tokens if input_tokens is not None else 0
            total_output += output_tokens if output_tokens is not None else 0
            total_total += total_tokens if total_tokens is not None else 0

    return (
        total_input if total_input > 0 else None,
        total_output if total_output > 0 else None,
        total_total if total_total > 0 else None,
    )


def _get_token_usage(obj: "Any") -> "Optional[Dict[str, Any]]":
    """
    Check multiple paths to extract token usage from different objects.
    """
    possible_names = ("usage", "token_usage", "usage_metadata")

    message = _get_value(obj, "message")
    if message is not None:
        for name in possible_names:
            usage = _get_value(message, name)
            if usage is not None:
                return usage

    llm_output = _get_value(obj, "llm_output")
    if llm_output is not None:
        for name in possible_names:
            usage = _get_value(llm_output, name)
            if usage is not None:
                return usage

    for name in possible_names:
        usage = _get_value(obj, name)
        if usage is not None:
            return usage

    return None


def _record_token_usage(span: "Union[Span, StreamedSpan]", response: "Any") -> None:
    token_usage = _get_token_usage(response)
    if token_usage:
        input_tokens, output_tokens, total_tokens = _extract_tokens(token_usage)
    else:
        input_tokens, output_tokens, total_tokens = _extract_tokens_from_generations(
            response.generations
        )

    set_on_span = (
        span.set_attribute if isinstance(span, StreamedSpan) else span.set_data
    )

    if input_tokens is not None:
        set_on_span(SPANDATA.GEN_AI_USAGE_INPUT_TOKENS, input_tokens)

    if output_tokens is not None:
        set_on_span(SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens)

    if total_tokens is not None:
        set_on_span(SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS, total_tokens)


def _get_request_data(
    obj: "Any", args: "Any", kwargs: "Any"
) -> "tuple[Optional[str], Optional[List[Any]]]":
    """
    Get the agent name and available tools for the agent.
    """
    agent = getattr(obj, "agent", None)
    runnable = getattr(agent, "runnable", None)
    runnable_config = getattr(runnable, "config", {})
    tools = (
        getattr(obj, "tools", None)
        or getattr(agent, "tools", None)
        or runnable_config.get("tools")
        or runnable_config.get("available_tools")
    )
    tools = tools if tools and len(tools) > 0 else None

    try:
        agent_name = None
        if len(args) > 1:
            agent_name = args[1].get("run_name")
        if agent_name is None:
            agent_name = runnable_config.get("run_name")
    except Exception:
        pass

    return (agent_name, tools)


def _simplify_langchain_tools(tools: "Any") -> "Optional[List[Any]]":
    """Parse and simplify tools into a cleaner format."""
    if not tools:
        return None

    if not isinstance(tools, (list, tuple)):
        return None

    simplified_tools = []
    for tool in tools:
        try:
            if isinstance(tool, dict):
                if "function" in tool and isinstance(tool["function"], dict):
                    func = tool["function"]
                    simplified_tool = {
                        "name": func.get("name"),
                        "description": func.get("description"),
                    }
                    if simplified_tool["name"]:
                        simplified_tools.append(simplified_tool)
                elif "name" in tool:
                    simplified_tool = {
                        "name": tool.get("name"),
                        "description": tool.get("description"),
                    }
                    simplified_tools.append(simplified_tool)
                else:
                    name = (
                        tool.get("name")
                        or tool.get("tool_name")
                        or tool.get("function_name")
                    )
                    if name:
                        simplified_tools.append(
                            {
                                "name": name,
                                "description": tool.get("description")
                                or tool.get("desc"),
                            }
                        )
            elif hasattr(tool, "name"):
                simplified_tool = {
                    "name": getattr(tool, "name", None),
                    "description": getattr(tool, "description", None)
                    or getattr(tool, "desc", None),
                }
                if simplified_tool["name"]:
                    simplified_tools.append(simplified_tool)
            elif hasattr(tool, "__name__"):
                simplified_tools.append(
                    {
                        "name": tool.__name__,
                        "description": getattr(tool, "__doc__", None),
                    }
                )
            else:
                tool_str = str(tool)
                if tool_str and tool_str != "":
                    simplified_tools.append({"name": tool_str, "description": None})
        except Exception:
            continue

    return simplified_tools if simplified_tools else None


def _set_tools_on_span(span: "Union[Span, StreamedSpan]", tools: "Any") -> None:
    """Set available tools data on a span if tools are provided."""
    if tools is not None:
        simplified_tools = _simplify_langchain_tools(tools)
        if simplified_tools:
            set_data_normalized(
                span,
                SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS,
                simplified_tools,
                unpack=False,
            )


def _wrap_configure(f: "Callable[..., Any]") -> "Callable[..., Any]":
    @wraps(f)
    def new_configure(
        callback_manager_cls: type,
        inheritable_callbacks: "Callbacks" = None,
        local_callbacks: "Callbacks" = None,
        *args: "Any",
        **kwargs: "Any",
    ) -> "Any":
        integration = sentry_sdk.get_client().get_integration(LangchainIntegration)
        if integration is None:
            return f(
                callback_manager_cls,
                inheritable_callbacks,
                local_callbacks,
                *args,
                **kwargs,
            )

        local_callbacks = local_callbacks or []

        # Handle each possible type of local_callbacks. For each type, we
        # extract the list of callbacks to check for SentryLangchainCallback,
        # and define a function that would add the SentryLangchainCallback
        # to the existing callbacks list.
        if isinstance(local_callbacks, BaseCallbackManager):
            callbacks_list = local_callbacks.handlers
        elif isinstance(local_callbacks, BaseCallbackHandler):
            callbacks_list = [local_callbacks]
        elif isinstance(local_callbacks, list):
            callbacks_list = local_callbacks
        else:
            logger.debug("Unknown callback type: %s", local_callbacks)
            # Just proceed with original function call
            return f(
                callback_manager_cls,
                inheritable_callbacks,
                local_callbacks,
                *args,
                **kwargs,
            )

        # Handle each possible type of inheritable_callbacks.
        if isinstance(inheritable_callbacks, BaseCallbackManager):
            inheritable_callbacks_list = inheritable_callbacks.handlers
        elif isinstance(inheritable_callbacks, list):
            inheritable_callbacks_list = inheritable_callbacks
        else:
            inheritable_callbacks_list = []

        if not any(
            isinstance(cb, SentryLangchainCallback)
            for cb in itertools.chain(callbacks_list, inheritable_callbacks_list)
        ):
            sentry_handler = SentryLangchainCallback(
                integration.max_spans,
                integration.include_prompts,
            )
            if isinstance(local_callbacks, BaseCallbackManager):
                local_callbacks = local_callbacks.copy()
                local_callbacks.handlers = [
                    *local_callbacks.handlers,
                    sentry_handler,
                ]
            elif isinstance(local_callbacks, BaseCallbackHandler):
                local_callbacks = [local_callbacks, sentry_handler]
            else:
                local_callbacks = [*local_callbacks, sentry_handler]

        return f(
            callback_manager_cls,
            inheritable_callbacks,
            local_callbacks,
            *args,
            **kwargs,
        )

    return new_configure


def _wrap_agent_executor_invoke(f: "Callable[..., Any]") -> "Callable[..., Any]":
    @wraps(f)
    def new_invoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
        client = sentry_sdk.get_client()
        integration = client.get_integration(LangchainIntegration)
        if integration is None:
            return f(self, *args, **kwargs)

        run_name, tools = _get_request_data(self, args, kwargs)

        if has_span_streaming_enabled(client.options):
            with sentry_sdk.traces.start_span(
                name=f"invoke_agent {run_name}" if run_name else "invoke_agent",
                attributes={
                    "sentry.op": OP.GEN_AI_INVOKE_AGENT,
                    "sentry.origin": LangchainIntegration.origin,
                    SPANDATA.GEN_AI_OPERATION_NAME: "invoke_agent",
                    SPANDATA.GEN_AI_RESPONSE_STREAMING: False,
                },
            ) as span:
                if run_name:
                    span.set_attribute(SPANDATA.GEN_AI_FUNCTION_ID, run_name)

                _set_tools_on_span(span, tools)

                # Run the agent
                result = f(self, *args, **kwargs)

                input = result.get("input")
                if (
                    input is not None
                    and should_send_default_pii()
                    and integration.include_prompts
                ):
                    normalized_messages = normalize_message_roles([input])

                    client = sentry_sdk.get_client()
                    scope = sentry_sdk.get_current_scope()
                    messages_data = (
                        truncate_and_annotate_messages(normalized_messages, span, scope)
                        if should_truncate_gen_ai_input(client.options)
                        else normalized_messages
                    )
                    if messages_data is not None:
                        set_data_normalized(
                            span,
                            SPANDATA.GEN_AI_REQUEST_MESSAGES,
                            messages_data,
                            unpack=False,
                        )

                output = result.get("output")
                if (
                    output is not None
                    and should_send_default_pii()
                    and integration.include_prompts
                ):
                    set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output)

                return result
        else:
            start_span_function = get_start_span_function()

            with start_span_function(
                op=OP.GEN_AI_INVOKE_AGENT,
                name=f"invoke_agent {run_name}" if run_name else "invoke_agent",
                origin=LangchainIntegration.origin,
            ) as span:
                if run_name:
                    span.set_data(SPANDATA.GEN_AI_FUNCTION_ID, run_name)

                span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent")
                span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, False)

                _set_tools_on_span(span, tools)

                # Run the agent
                result = f(self, *args, **kwargs)

                input = result.get("input")
                if (
                    input is not None
                    and should_send_default_pii()
                    and integration.include_prompts
                ):
                    normalized_messages = normalize_message_roles([input])

                    client = sentry_sdk.get_client()
                    scope = sentry_sdk.get_current_scope()
                    messages_data = (
                        truncate_and_annotate_messages(normalized_messages, span, scope)
                        if should_truncate_gen_ai_input(client.options)
                        else normalized_messages
                    )
                    if messages_data is not None:
                        set_data_normalized(
                            span,
                            SPANDATA.GEN_AI_REQUEST_MESSAGES,
                            messages_data,
                            unpack=False,
                        )

                output = result.get("output")
                if (
                    output is not None
                    and should_send_default_pii()
                    and integration.include_prompts
                ):
                    set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output)

                return result

    return new_invoke


def _wrap_agent_executor_stream(f: "Callable[..., Any]") -> "Callable[..., Any]":
    @wraps(f)
    def new_stream(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
        client = sentry_sdk.get_client()
        integration = client.get_integration(LangchainIntegration)
        if integration is None:
            return f(self, *args, **kwargs)

        run_name, tools = _get_request_data(self, args, kwargs)

        if has_span_streaming_enabled(client.options):
            span = sentry_sdk.traces.start_span(
                name=f"invoke_agent {run_name}" if run_name else "invoke_agent",
                attributes={
                    "sentry.op": OP.GEN_AI_INVOKE_AGENT,
                    "sentry.origin": LangchainIntegration.origin,
                    SPANDATA.GEN_AI_OPERATION_NAME: "invoke_agent",
                    SPANDATA.GEN_AI_RESPONSE_STREAMING: True,
                },
            )

            if run_name:
                span.set_attribute(SPANDATA.GEN_AI_FUNCTION_ID, run_name)
        else:
            start_span_function = get_start_span_function()

            span = start_span_function(
                op=OP.GEN_AI_INVOKE_AGENT,
                name=f"invoke_agent {run_name}" if run_name else "invoke_agent",
                origin=LangchainIntegration.origin,
            )
            span.__enter__()

            span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent")
            span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)

            if run_name:
                span.set_data(SPANDATA.GEN_AI_FUNCTION_ID, run_name)

        _set_tools_on_span(span, tools)

        input = args[0].get("input") if len(args) >= 1 else None
        if (
            input is not None
            and should_send_default_pii()
            and integration.include_prompts
        ):
            normalized_messages = normalize_message_roles([input])

            client = sentry_sdk.get_client()
            scope = sentry_sdk.get_current_scope()
            messages_data = (
                truncate_and_annotate_messages(normalized_messages, span, scope)
                if should_truncate_gen_ai_input(client.options)
                else normalized_messages
            )
            if messages_data is not None:
                set_data_normalized(
                    span,
                    SPANDATA.GEN_AI_REQUEST_MESSAGES,
                    messages_data,
                    unpack=False,
                )

        # Run the agent
        result = f(self, *args, **kwargs)

        old_iterator = result

        def new_iterator() -> "Iterator[Any]":
            exc_info: "tuple[Any, Any, Any]" = (None, None, None)
            try:
                for event in old_iterator:
                    yield event

                try:
                    output = event.get("output")
                except Exception:
                    output = None

                if (
                    output is not None
                    and should_send_default_pii()
                    and integration.include_prompts
                ):
                    set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output)

                span.__exit__(None, None, None)
            except Exception:
                exc_info = sys.exc_info()
                with capture_internal_exceptions():
                    span.__exit__(*exc_info)
                raise

        async def new_iterator_async() -> "AsyncIterator[Any]":
            exc_info: "tuple[Any, Any, Any]" = (None, None, None)
            try:
                async for event in old_iterator:
                    yield event

                try:
                    output = event.get("output")
                except Exception:
                    output = None

                if (
                    output is not None
                    and should_send_default_pii()
                    and integration.include_prompts
                ):
                    set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output)

                span.__exit__(None, None, None)
            except Exception:
                exc_info = sys.exc_info()
                with capture_internal_exceptions():
                    span.__exit__(*exc_info)
                raise

        if str(type(result)) == "<class 'async_generator'>":
            result = new_iterator_async()
        else:
            result = new_iterator()

        return result

    return new_stream


def _patch_embeddings_provider(provider_class: "Any") -> None:
    """Patch an embeddings provider class with monitoring wrappers."""
    if provider_class is None:
        return

    if hasattr(provider_class, "embed_documents"):
        provider_class.embed_documents = _wrap_embedding_method(
            provider_class.embed_documents
        )
    if hasattr(provider_class, "embed_query"):
        provider_class.embed_query = _wrap_embedding_method(provider_class.embed_query)
    if hasattr(provider_class, "aembed_documents"):
        provider_class.aembed_documents = _wrap_async_embedding_method(
            provider_class.aembed_documents
        )
    if hasattr(provider_class, "aembed_query"):
        provider_class.aembed_query = _wrap_async_embedding_method(
            provider_class.aembed_query
        )


def _wrap_embedding_method(f: "Callable[..., Any]") -> "Callable[..., Any]":
    """Wrap sync embedding methods (embed_documents and embed_query)."""

    @wraps(f)
    def new_embedding_method(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
        client = sentry_sdk.get_client()
        integration = client.get_integration(LangchainIntegration)
        if integration is None:
            return f(self, *args, **kwargs)

        model_name = getattr(self, "model", None) or getattr(self, "model_name", None)

        if has_span_streaming_enabled(client.options):
            with sentry_sdk.traces.start_span(
                name=f"embeddings {model_name}" if model_name else "embeddings",
                attributes={
                    "sentry.op": OP.GEN_AI_EMBEDDINGS,
                    "sentry.origin": LangchainIntegration.origin,
                    SPANDATA.GEN_AI_OPERATION_NAME: "embeddings",
                },
            ) as span:
                if model_name:
                    span.set_attribute(SPANDATA.GEN_AI_REQUEST_MODEL, model_name)

                # Capture input if PII is allowed
                if (
                    should_send_default_pii()
                    and integration.include_prompts
                    and len(args) > 0
                ):
                    input_data = args[0]
                    # Normalize to list format
                    texts = input_data if isinstance(input_data, list) else [input_data]
                    set_data_normalized(
                        span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False
                    )

                result = f(self, *args, **kwargs)
                return result
        else:
            with sentry_sdk.start_span(
                op=OP.GEN_AI_EMBEDDINGS,
                name=f"embeddings {model_name}" if model_name else "embeddings",
                origin=LangchainIntegration.origin,
            ) as span:
                span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "embeddings")
                if model_name:
                    span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name)

                # Capture input if PII is allowed
                if (
                    should_send_default_pii()
                    and integration.include_prompts
                    and len(args) > 0
                ):
                    input_data = args[0]
                    # Normalize to list format
                    texts = input_data if isinstance(input_data, list) else [input_data]
                    set_data_normalized(
                        span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False
                    )

                result = f(self, *args, **kwargs)
                return result

    return new_embedding_method


def _wrap_async_embedding_method(f: "Callable[..., Any]") -> "Callable[..., Any]":
    """Wrap async embedding methods (aembed_documents and aembed_query)."""

    @wraps(f)
    async def new_async_embedding_method(
        self: "Any", *args: "Any", **kwargs: "Any"
    ) -> "Any":
        client = sentry_sdk.get_client()
        integration = client.get_integration(LangchainIntegration)
        if integration is None:
            return await f(self, *args, **kwargs)

        model_name = getattr(self, "model", None) or getattr(self, "model_name", None)

        if has_span_streaming_enabled(client.options):
            with sentry_sdk.traces.start_span(
                name=f"embeddings {model_name}" if model_name else "embeddings",
                attributes={
                    "sentry.op": OP.GEN_AI_EMBEDDINGS,
                    "sentry.origin": LangchainIntegration.origin,
                    SPANDATA.GEN_AI_OPERATION_NAME: "embeddings",
                },
            ) as span:
                if model_name:
                    span.set_attribute(SPANDATA.GEN_AI_REQUEST_MODEL, model_name)

                # Capture input if PII is allowed
                if (
                    should_send_default_pii()
                    and integration.include_prompts
                    and len(args) > 0
                ):
                    input_data = args[0]
                    # Normalize to list format
                    texts = input_data if isinstance(input_data, list) else [input_data]
                    set_data_normalized(
                        span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False
                    )

                result = await f(self, *args, **kwargs)
                return result
        else:
            with sentry_sdk.start_span(
                op=OP.GEN_AI_EMBEDDINGS,
                name=f"embeddings {model_name}" if model_name else "embeddings",
                origin=LangchainIntegration.origin,
            ) as span:
                span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "embeddings")
                if model_name:
                    span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name)

                # Capture input if PII is allowed
                if (
                    should_send_default_pii()
                    and integration.include_prompts
                    and len(args) > 0
                ):
                    input_data = args[0]
                    # Normalize to list format
                    texts = input_data if isinstance(input_data, list) else [input_data]
                    set_data_normalized(
                        span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False
                    )

                result = await f(self, *args, **kwargs)
                return result

    return new_async_embedding_method

Directory Contents

Dirs: 10 × Files: 73

Name Size Perms Modified Actions
celery DIR
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
django DIR
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
grpc DIR
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
redis DIR
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
spark DIR
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
- drwxr-xr-x 2026-06-11 06:30:31
Edit Download
19.28 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
9.09 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
39.00 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
876 B lrw-r--r-- 2026-06-11 06:30:30
Edit Download
5.70 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
9.23 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
20.06 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
9.28 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
9.68 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
1.51 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
17.41 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
4.91 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
6.20 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
7.21 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
4.51 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
5.85 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
7.49 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
10.44 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
1.86 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
8.02 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
2.25 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
1.93 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
9.04 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
5.28 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
8.27 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
10.57 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
2.72 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
4.93 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
5.71 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
9.79 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
9.80 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
8.19 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
15.28 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
48.31 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
18.13 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
1.87 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
13.03 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
11.46 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
15.69 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
6.35 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
23.12 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
787 B lrw-r--r-- 2026-06-11 06:30:30
Edit Download
53.38 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
1.08 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
7.99 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
4.41 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
8.21 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
7.42 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
6.82 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
7.32 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
5.75 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
7.81 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
9.44 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
15.25 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
1.58 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
5.02 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
5.24 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
27.93 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
11.04 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
1.19 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
14.01 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
17.39 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
2.35 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
6.88 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
10.79 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
1.67 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
1.72 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
1.02 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
1.65 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
15.03 KB lrw-r--r-- 2026-06-11 06:30:31
Edit Download
4.00 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
7.28 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download
12.51 KB lrw-r--r-- 2026-06-11 06:30:30
Edit Download

If ZipArchive is unavailable, a .tar will be created (no compression).