Preview: agent_run.py
Size: 11.66 KB
//opt/hc_python/lib/python3.12/site-packages/sentry_sdk/integrations/openai_agents/patches/agent_run.py
import sys
from typing import TYPE_CHECKING
from sentry_sdk.consts import SPANDATA
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.utils import capture_internal_exceptions, reraise
from ..spans import (
handoff_span,
invoke_agent_span,
update_invoke_agent_span,
)
if TYPE_CHECKING:
from typing import Any, Awaitable, Callable, Optional
from agents.run_internal.run_steps import SingleStepResult
from sentry_sdk.tracing import Span
try:
import agents
except ImportError:
raise DidNotEnable("OpenAI Agents not installed")
def _has_active_agent_span(context_wrapper: "agents.RunContextWrapper") -> bool:
"""Check if there's an active agent span for this context"""
return getattr(context_wrapper, "_sentry_current_agent", None) is not None
def _get_current_agent(
context_wrapper: "agents.RunContextWrapper",
) -> "Optional[agents.Agent]":
"""Get the current agent from context wrapper"""
return getattr(context_wrapper, "_sentry_current_agent", None)
def _close_streaming_workflow_span(agent: "Optional[agents.Agent]") -> None:
"""Close the workflow span for streaming executions if it exists."""
if agent and hasattr(agent, "_sentry_workflow_span"):
workflow_span = agent._sentry_workflow_span
workflow_span.__exit__(*sys.exc_info())
delattr(agent, "_sentry_workflow_span")
def _maybe_start_agent_span(
context_wrapper: "agents.RunContextWrapper",
agent: "agents.Agent",
should_run_agent_start_hooks: bool,
span_kwargs: "dict[str, Any]",
is_streaming: bool = False,
) -> "Optional[Span]":
"""
Start an agent invocation span if conditions are met.
Handles ending any existing span for a different agent.
Returns the new span if started, or the existing span if conditions aren't met.
"""
if not (should_run_agent_start_hooks and agent and context_wrapper):
return getattr(context_wrapper, "_sentry_agent_span", None)
# End any existing span for a different agent
if _has_active_agent_span(context_wrapper):
current_agent = _get_current_agent(context_wrapper)
if current_agent and current_agent != agent:
span = getattr(context_wrapper, "_sentry_agent_span", None)
if span:
update_invoke_agent_span(
span=span, context=context_wrapper, agent=agent
)
span.__exit__(None, None, None)
delattr(context_wrapper, "_sentry_agent_span")
# Store the agent on the context wrapper so we can access it later
context_wrapper._sentry_current_agent = agent
span = invoke_agent_span(context_wrapper, agent, span_kwargs)
context_wrapper._sentry_agent_span = span
agent._sentry_agent_span = span
if is_streaming:
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)
return span
async def _run_single_turn(
original_run_single_turn: "Callable[..., Awaitable[SingleStepResult]]",
*args: "Any",
**kwargs: "Any",
) -> "SingleStepResult":
"""
Patched _run_single_turn that
- creates agent invocation spans if there is no already active agent invocation span.
- ends the agent invocation span if and only if an exception is raised in `_run_single_turn()`.
"""
# openai-agents >= 0.14 passes `bindings: AgentBindings` instead of `agent`.
bindings = kwargs.get("bindings")
agent = (
getattr(bindings, "public_agent", None)
if bindings is not None
else kwargs.get("agent")
)
context_wrapper = kwargs.get("context_wrapper")
should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks", False)
span = _maybe_start_agent_span(
context_wrapper, agent, should_run_agent_start_hooks, kwargs
)
if span is None or span.timestamp is not None:
return await original_run_single_turn(*args, **kwargs)
try:
result = await original_run_single_turn(*args, **kwargs)
except Exception:
exc_info = sys.exc_info()
with capture_internal_exceptions():
span = getattr(context_wrapper, "_sentry_agent_span", None)
if span:
update_invoke_agent_span(
span=span, context=context_wrapper, agent=agent
)
span.__exit__(*exc_info)
delattr(context_wrapper, "_sentry_agent_span")
reraise(*exc_info)
return result
async def _run_single_turn_streamed(
original_run_single_turn_streamed: "Callable[..., Awaitable[SingleStepResult]]",
*args: "Any",
**kwargs: "Any",
) -> "SingleStepResult":
"""
Patched _run_single_turn_streamed that
- creates agent invocation spans for streaming if there is no already active agent invocation span.
- ends the agent invocation span if and only if `_run_single_turn_streamed()` raises an exception.
Note: Unlike _run_single_turn which uses keyword-only arguments (*,),
_run_single_turn_streamed uses positional arguments. The call signature <v0.14 is:
_run_single_turn_streamed(
streamed_result, # args[0]
agent, # args[1]
hooks, # args[2]
context_wrapper, # args[3]
run_config, # args[4]
should_run_agent_start_hooks, # args[5]
tool_use_tracker, # args[6]
all_tools, # args[7]
server_conversation_tracker, # args[8] (optional)
)
The call signature >=v0.14 is:
_run_single_turn_streamed(
streamed_result, # args[0]
bindings, # args[1]
hooks, # args[2]
context_wrapper, # args[3]
run_config, # args[4]
should_run_agent_start_hooks, # args[5]
tool_use_tracker, # args[6]
all_tools, # args[7]
server_conversation_tracker, # args[8] (optional)
)
"""
streamed_result = args[0] if len(args) > 0 else kwargs.get("streamed_result")
# openai-agents >= 0.14 passes `bindings: AgentBindings` at args[1] instead of `agent`.
agent_or_bindings = (
args[1] if len(args) > 1 else kwargs.get("bindings", kwargs.get("agent"))
)
agent = getattr(agent_or_bindings, "public_agent", agent_or_bindings)
context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper")
should_run_agent_start_hooks = bool(
args[5] if len(args) > 5 else kwargs.get("should_run_agent_start_hooks", False)
)
span_kwargs: "dict[str, Any]" = {}
if streamed_result and hasattr(streamed_result, "input"):
span_kwargs["original_input"] = streamed_result.input
span = _maybe_start_agent_span(
context_wrapper,
agent,
should_run_agent_start_hooks,
span_kwargs,
is_streaming=True,
)
if span is None or span.timestamp is not None:
return await original_run_single_turn_streamed(*args, **kwargs)
try:
result = await original_run_single_turn_streamed(*args, **kwargs)
except Exception:
exc_info = sys.exc_info()
with capture_internal_exceptions():
span = getattr(context_wrapper, "_sentry_agent_span", None)
if span:
update_invoke_agent_span(
span=span, context=context_wrapper, agent=agent
)
span.__exit__(*exc_info)
delattr(context_wrapper, "_sentry_agent_span")
_close_streaming_workflow_span(agent)
reraise(*exc_info)
return result
async def _execute_handoffs(
original_execute_handoffs: "Callable[..., SingleStepResult]",
*args: "Any",
**kwargs: "Any",
) -> "SingleStepResult":
"""
Patched execute_handoffs that
- creates and manages handoff spans.
- ends the agent invocation span.
- ends the workflow span if the response is streamed and an exception is raised in `execute_handoffs()`.
"""
context_wrapper = kwargs.get("context_wrapper")
run_handoffs = kwargs.get("run_handoffs")
# openai-agents >= 0.14 renamed `agent` to `public_agent`.
agent = kwargs.get("public_agent", kwargs.get("agent"))
# Create Sentry handoff span for the first handoff (agents library only processes the first one)
if run_handoffs:
first_handoff = run_handoffs[0]
handoff_agent_name = first_handoff.handoff.agent_name
handoff_span(context_wrapper, agent, handoff_agent_name)
if not agent or not context_wrapper or not _has_active_agent_span(context_wrapper):
# Call original method with all parameters
try:
return await original_execute_handoffs(*args, **kwargs)
except Exception:
exc_info = sys.exc_info()
with capture_internal_exceptions():
_close_streaming_workflow_span(agent)
reraise(*exc_info)
# Call original method with all parameters
try:
result = await original_execute_handoffs(*args, **kwargs)
except Exception:
exc_info = sys.exc_info()
with capture_internal_exceptions():
_close_streaming_workflow_span(agent)
span = getattr(context_wrapper, "_sentry_agent_span", None)
if span:
update_invoke_agent_span(
span=span, context=context_wrapper, agent=agent
)
span.__exit__(*exc_info)
delattr(context_wrapper, "_sentry_agent_span")
reraise(*exc_info)
span = getattr(context_wrapper, "_sentry_agent_span", None)
if span:
update_invoke_agent_span(span=span, context=context_wrapper, agent=agent)
span.__exit__(None, None, None)
delattr(context_wrapper, "_sentry_agent_span")
return result
async def _execute_final_output(
original_execute_final_output: "Callable[..., SingleStepResult]",
*args: "Any",
**kwargs: "Any",
) -> "SingleStepResult":
"""
Patched execute_final_output that
- ends the agent invocation span.
- ends the workflow span if the response is streamed.
"""
# openai-agents >= 0.14 renamed `agent` to `public_agent`.
agent = kwargs.get("public_agent", kwargs.get("agent"))
context_wrapper = kwargs.get("context_wrapper")
final_output = kwargs.get("final_output")
if not agent or not context_wrapper or not _has_active_agent_span(context_wrapper):
try:
return await original_execute_final_output(*args, **kwargs)
finally:
with capture_internal_exceptions():
# For streaming, close the workflow span (non-streaming uses context manager in _create_run_wrapper)
_close_streaming_workflow_span(agent)
try:
result = await original_execute_final_output(*args, **kwargs)
except Exception:
exc_info = sys.exc_info()
with capture_internal_exceptions():
# For streaming, close the workflow span (non-streaming uses context manager in _create_run_wrapper)
_close_streaming_workflow_span(agent)
span = getattr(context_wrapper, "_sentry_agent_span", None)
if span:
update_invoke_agent_span(
span=span, context=context_wrapper, agent=agent, output=final_output
)
span.__exit__(*exc_info)
delattr(context_wrapper, "_sentry_agent_span")
reraise(*exc_info)
span = getattr(context_wrapper, "_sentry_agent_span", None)
if span:
update_invoke_agent_span(
span=span, context=context_wrapper, agent=agent, output=final_output
)
span.__exit__(None, None, None)
delattr(context_wrapper, "_sentry_agent_span")
return result
Directory Contents
Dirs: 1 × Files: 6