REDROOM
PHP 8.3.31
Path:
Logout
Edit File
Size: 11.21 KB
Close
//opt/hc_python/lib/python3.12/site-packages/sentry_sdk/integrations/pydantic_ai/spans/ai_client.py
Text
Base64
import json from typing import TYPE_CHECKING import sentry_sdk from sentry_sdk.ai.utils import ( normalize_message_roles, set_data_normalized, truncate_and_annotate_messages, ) from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.traces import StreamedSpan from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.utils import safe_serialize from ..consts import SPAN_ORIGIN from ..utils import ( _get_model_name, _set_agent_data, _set_available_tools, _set_model_data, _should_send_prompts, get_current_agent, get_is_streaming, ) from .utils import ( _serialize_binary_content_item, _serialize_image_url_item, _set_usage_data, ) if TYPE_CHECKING: from typing import Any, Dict, List, Union from pydantic_ai.messages import ModelMessage, SystemPromptPart # type: ignore from sentry_sdk._types import TextPart as SentryTextPart try: from pydantic_ai.messages import ( BaseToolCallPart, BaseToolReturnPart, BinaryContent, ImageUrl, SystemPromptPart, TextPart, ThinkingPart, UserPromptPart, ) except ImportError: # Fallback if these classes are not available BaseToolCallPart = None BaseToolReturnPart = None SystemPromptPart = None UserPromptPart = None TextPart = None ThinkingPart = None BinaryContent = None ImageUrl = None def _transform_system_instructions( permanent_instructions: "list[SystemPromptPart]", current_instructions: "list[str]", ) -> "list[SentryTextPart]": text_parts: "list[SentryTextPart]" = [ { "type": "text", "content": instruction.content, } for instruction in permanent_instructions ] text_parts.extend( { "type": "text", "content": instruction, } for instruction in current_instructions ) return text_parts def _get_system_instructions( messages: "list[ModelMessage]", ) -> "tuple[list[SystemPromptPart], list[str]]": permanent_instructions = [] current_instructions = [] for msg in messages: if hasattr(msg, "parts"): for part in msg.parts: if SystemPromptPart and isinstance(part, SystemPromptPart): permanent_instructions.append(part) if hasattr(msg, "instructions") and msg.instructions is not None: current_instructions.append(msg.instructions) return permanent_instructions, current_instructions def _set_input_messages( span: "Union[sentry_sdk.tracing.Span, StreamedSpan]", messages: "Any" ) -> None: """Set input messages data on a span.""" if not _should_send_prompts(): return if not messages: return permanent_instructions, current_instructions = _get_system_instructions(messages) if len(permanent_instructions) > 0 or len(current_instructions) > 0: if isinstance(span, StreamedSpan): span.set_attribute( SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS, json.dumps( _transform_system_instructions( permanent_instructions, current_instructions ) ), ) else: span.set_data( SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS, json.dumps( _transform_system_instructions( permanent_instructions, current_instructions ) ), ) try: formatted_messages = [] for msg in messages: if hasattr(msg, "parts"): for part in msg.parts: role = "user" # Use isinstance checks with proper base classes if SystemPromptPart and isinstance(part, SystemPromptPart): continue elif ( (TextPart and isinstance(part, TextPart)) or (ThinkingPart and isinstance(part, ThinkingPart)) or (BaseToolCallPart and isinstance(part, BaseToolCallPart)) ): role = "assistant" elif BaseToolReturnPart and isinstance(part, BaseToolReturnPart): role = "tool" content: "List[Dict[str, Any] | str]" = [] tool_calls = None tool_call_id = None # Handle ToolCallPart (assistant requesting tool use) if BaseToolCallPart and isinstance(part, BaseToolCallPart): tool_call_data = {} if hasattr(part, "tool_name"): tool_call_data["name"] = part.tool_name if hasattr(part, "args"): tool_call_data["arguments"] = safe_serialize(part.args) if tool_call_data: tool_calls = [tool_call_data] # Handle ToolReturnPart (tool result) elif BaseToolReturnPart and isinstance(part, BaseToolReturnPart): if hasattr(part, "tool_name"): tool_call_id = part.tool_name if hasattr(part, "content"): content.append({"type": "text", "text": str(part.content)}) # Handle regular content elif hasattr(part, "content"): if isinstance(part.content, str): content.append({"type": "text", "text": part.content}) elif isinstance(part.content, list): for item in part.content: if isinstance(item, str): content.append({"type": "text", "text": item}) elif ImageUrl and isinstance(item, ImageUrl): content.append(_serialize_image_url_item(item)) elif BinaryContent and isinstance(item, BinaryContent): content.append(_serialize_binary_content_item(item)) else: content.append(safe_serialize(item)) else: content.append({"type": "text", "text": str(part.content)}) # Add message if we have content or tool calls if content or tool_calls: message: "Dict[str, Any]" = {"role": role} if content: message["content"] = content if tool_calls: message["tool_calls"] = tool_calls if tool_call_id: message["tool_call_id"] = tool_call_id formatted_messages.append(message) if formatted_messages: normalized_messages = normalize_message_roles(formatted_messages) client = sentry_sdk.get_client() scope = sentry_sdk.get_current_scope() messages_data = ( normalized_messages if client.options.get("stream_gen_ai_spans", False) else truncate_and_annotate_messages(normalized_messages, span, scope) ) set_data_normalized( span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False ) except Exception: # If we fail to format messages, just skip it pass def _set_output_data( span: "Union[sentry_sdk.tracing.Span, StreamedSpan]", response: "Any" ) -> None: """Set output data on a span.""" if not _should_send_prompts(): return if not response: return set_on_span = ( span.set_attribute if isinstance(span, StreamedSpan) else span.set_data ) set_on_span(SPANDATA.GEN_AI_RESPONSE_MODEL, response.model_name) try: # Extract text from ModelResponse if hasattr(response, "parts"): texts = [] tool_calls = [] for part in response.parts: if TextPart and isinstance(part, TextPart) and hasattr(part, "content"): texts.append(part.content) elif BaseToolCallPart and isinstance(part, BaseToolCallPart): tool_call_data = { "type": "function", } if hasattr(part, "tool_name"): tool_call_data["name"] = part.tool_name if hasattr(part, "args"): tool_call_data["arguments"] = safe_serialize(part.args) tool_calls.append(tool_call_data) if texts: set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, texts) if tool_calls: set_on_span( SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS, safe_serialize(tool_calls) ) except Exception: # If we fail to format output, just skip it pass def ai_client_span( messages: "Any", agent: "Any", model: "Any", model_settings: "Any" ) -> "Union[sentry_sdk.tracing.Span, StreamedSpan]": """Create a span for an AI client call (model request). Args: messages: Full conversation history (list of messages) agent: Agent object model: Model object model_settings: Model settings """ # Determine model name for span name model_obj = model if agent and hasattr(agent, "model"): model_obj = agent.model model_name = _get_model_name(model_obj) or "unknown" span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options) if span_streaming: span = sentry_sdk.traces.start_span( name=f"chat {model_name}", attributes={ "sentry.op": OP.GEN_AI_CHAT, "sentry.origin": SPAN_ORIGIN, SPANDATA.GEN_AI_OPERATION_NAME: "chat", SPANDATA.GEN_AI_RESPONSE_STREAMING: get_is_streaming(), }, ) else: span = sentry_sdk.start_span( op=OP.GEN_AI_CHAT, name=f"chat {model_name}", origin=SPAN_ORIGIN, ) span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "chat") # Set streaming flag from contextvar span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, get_is_streaming()) _set_agent_data(span, agent) _set_model_data(span, model, model_settings) # Add available tools if agent is available agent_obj = agent or get_current_agent() _set_available_tools(span, agent_obj) # Set input messages (full conversation history) if messages: _set_input_messages(span, messages) return span def update_ai_client_span( span: "Union[sentry_sdk.tracing.Span, StreamedSpan]", model_response: "Any" ) -> None: """Update the AI client span with response data.""" if not span: return # Set usage data if available if model_response and hasattr(model_response, "usage"): _set_usage_data(span, model_response.usage) # Set output data _set_output_data(span, model_response)
Save
Close
Exit & Reset
Text mode: syntax highlighting auto-detects file type.
Directory Contents
Dirs: 1 × Files: 5
Delete Selected
Select All
Select None
Sort:
Name
Size
Modified
Enable drag-to-move
Name
Size
Perms
Modified
Actions
__pycache__
DIR
-
drwxr-xr-x
2026-06-11 06:30:31
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
ai_client.py
11.21 KB
lrw-r--r--
2026-06-11 06:30:31
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
execute_tool.py
2.59 KB
lrw-r--r--
2026-06-11 06:30:31
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
invoke_agent.py
6.00 KB
lrw-r--r--
2026-06-11 06:30:31
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
utils.py
2.88 KB
lrw-r--r--
2026-06-11 06:30:31
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
__init__.py
243 B
lrw-r--r--
2026-06-11 06:30:31
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
Zip Selected
If ZipArchive is unavailable, a
.tar
will be created (no compression).