REDROOM
PHP 8.3.31
Path:
Logout
Edit File
Size: 6.85 KB
Close
//opt/hc_python/lib/python3.12/site-packages/sentry_sdk/integrations/pydantic_ai/patches/agent_run.py
Text
Base64
import sys from functools import wraps from typing import TYPE_CHECKING import sentry_sdk from sentry_sdk.integrations import DidNotEnable from sentry_sdk.utils import capture_internal_exceptions, reraise from ..spans import invoke_agent_span, update_invoke_agent_span from ..utils import _capture_exception, pop_agent, push_agent try: from pydantic_ai.agent import Agent # type: ignore except ImportError: raise DidNotEnable("pydantic-ai not installed") if TYPE_CHECKING: from typing import Any, Callable, Optional, Union class _StreamingContextManagerWrapper: """Wrapper for streaming methods that return async context managers.""" def __init__( self, agent: "Any", original_ctx_manager: "Any", user_prompt: "Any", model: "Any", model_settings: "Any", is_streaming: bool = True, ) -> None: self.agent = agent self.original_ctx_manager = original_ctx_manager self.user_prompt = user_prompt self.model = model self.model_settings = model_settings self.is_streaming = is_streaming self._isolation_scope: "Any" = None self._span: "Optional[Union[sentry_sdk.tracing.Span, sentry_sdk.traces.StreamedSpan]]" = None self._result: "Any" = None async def __aenter__(self) -> "Any": # Set up isolation scope and invoke_agent span self._isolation_scope = sentry_sdk.isolation_scope() self._isolation_scope.__enter__() # Create invoke_agent span (will be closed in __aexit__) self._span = invoke_agent_span( self.user_prompt, self.agent, self.model, self.model_settings, self.is_streaming, ) self._span.__enter__() # Push agent to contextvar stack after span is successfully created and entered # This ensures proper pairing with pop_agent() in __aexit__ even if exceptions occur push_agent(self.agent, self.is_streaming) # Enter the original context manager result = await self.original_ctx_manager.__aenter__() self._result = result return result async def __aexit__(self, exc_type: "Any", exc_val: "Any", exc_tb: "Any") -> None: try: # Exit the original context manager first await self.original_ctx_manager.__aexit__(exc_type, exc_val, exc_tb) # Update span with result if successful if exc_type is None and self._result and self._span is not None: update_invoke_agent_span(self._span, self._result) finally: # Pop agent from contextvar stack pop_agent() # Clean up invoke span if self._span: self._span.__exit__(exc_type, exc_val, exc_tb) # Clean up isolation scope if self._isolation_scope: self._isolation_scope.__exit__(exc_type, exc_val, exc_tb) def _create_run_wrapper( original_func: "Callable[..., Any]", is_streaming: bool = False ) -> "Callable[..., Any]": """ Wraps the Agent.run method to create an invoke_agent span. Args: original_func: The original run method is_streaming: Whether this is a streaming method (for future use) """ from sentry_sdk.integrations.pydantic_ai import ( PydanticAIIntegration, ) # Required to avoid circular import @wraps(original_func) async def wrapper(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": # Isolate each workflow so that when agents are run in asyncio tasks they # don't touch each other's scopes with sentry_sdk.isolation_scope(): # Extract parameters for the span user_prompt = kwargs.get("user_prompt") or (args[0] if args else None) model = kwargs.get("model") model_settings = kwargs.get("model_settings") if PydanticAIIntegration.using_request_hooks: metadata = kwargs.get("metadata") if metadata is None: kwargs["metadata"] = {"_sentry_span": None} # Create invoke_agent span with invoke_agent_span( user_prompt, self, model, model_settings, is_streaming ) as span: # Push agent to contextvar stack after span is successfully created and entered # This ensures proper pairing with pop_agent() in finally even if exceptions occur push_agent(self, is_streaming) try: result = await original_func(self, *args, **kwargs) # Update span with result update_invoke_agent_span(span, result) return result except Exception as exc: exc_info = sys.exc_info() with capture_internal_exceptions(): _capture_exception(exc) reraise(*exc_info) finally: # Pop agent from contextvar stack pop_agent() return wrapper def _create_streaming_wrapper( original_func: "Callable[..., Any]", ) -> "Callable[..., Any]": """ Wraps run_stream method that returns an async context manager. """ from sentry_sdk.integrations.pydantic_ai import ( PydanticAIIntegration, ) # Required to avoid circular import @wraps(original_func) def wrapper(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": # Extract parameters for the span user_prompt = kwargs.get("user_prompt") or (args[0] if args else None) model = kwargs.get("model") model_settings = kwargs.get("model_settings") if PydanticAIIntegration.using_request_hooks: metadata = kwargs.get("metadata") if metadata is None: kwargs["metadata"] = {"_sentry_span": None} # Call original function to get the context manager original_ctx_manager = original_func(self, *args, **kwargs) # Wrap it with our instrumentation return _StreamingContextManagerWrapper( agent=self, original_ctx_manager=original_ctx_manager, user_prompt=user_prompt, model=model, model_settings=model_settings, is_streaming=True, ) return wrapper def _patch_agent_run() -> None: """ Patches the Agent run methods to create spans for agent execution. This patches both non-streaming (run, run_sync) and streaming (run_stream, run_stream_events) methods. """ # Store original methods original_run = Agent.run original_run_stream = Agent.run_stream # Wrap and apply patches for non-streaming methods Agent.run = _create_run_wrapper(original_run, is_streaming=False) # Wrap and apply patches for streaming methods Agent.run_stream = _create_streaming_wrapper(original_run_stream)
Save
Close
Exit & Reset
Text mode: syntax highlighting auto-detects file type.
Directory Contents
Dirs: 1 × Files: 4
Delete Selected
Select All
Select None
Sort:
Name
Size
Modified
Enable drag-to-move
Name
Size
Perms
Modified
Actions
__pycache__
DIR
-
drwxr-xr-x
2026-06-11 06:30:31
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
agent_run.py
6.85 KB
lrw-r--r--
2026-06-11 06:30:31
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
graph_nodes.py
3.69 KB
lrw-r--r--
2026-06-11 06:30:31
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
tools.py
6.49 KB
lrw-r--r--
2026-06-11 06:30:31
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
__init__.py
167 B
lrw-r--r--
2026-06-11 06:30:31
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
Zip Selected
If ZipArchive is unavailable, a
.tar
will be created (no compression).