REDROOM
PHP 8.3.31
Path:
Logout
Edit File
Size: 5.51 KB
Close
/opt/hc_python/lib/python3.12/site-packages/sentry_sdk/integrations/google_genai/streaming.py
Text
Base64
from typing import TYPE_CHECKING, Any, List, Optional, TypedDict, Union from sentry_sdk.ai.utils import set_data_normalized from sentry_sdk.consts import SPANDATA from sentry_sdk.scope import should_send_default_pii from sentry_sdk.traces import StreamedSpan from sentry_sdk.utils import ( safe_serialize, ) from .utils import ( UsageData, extract_contents_text, extract_finish_reasons, extract_tool_calls, extract_usage_data, ) if TYPE_CHECKING: from google.genai.types import GenerateContentResponse from sentry_sdk.tracing import Span class AccumulatedResponse(TypedDict): id: "Optional[str]" model: "Optional[str]" text: str finish_reasons: "List[str]" tool_calls: "List[dict[str, Any]]" usage_metadata: "Optional[UsageData]" def element_wise_usage_max(self: "UsageData", other: "UsageData") -> "UsageData": return UsageData( input_tokens=max(self["input_tokens"], other["input_tokens"]), output_tokens=max(self["output_tokens"], other["output_tokens"]), input_tokens_cached=max( self["input_tokens_cached"], other["input_tokens_cached"] ), output_tokens_reasoning=max( self["output_tokens_reasoning"], other["output_tokens_reasoning"] ), total_tokens=max(self["total_tokens"], other["total_tokens"]), ) def accumulate_streaming_response( chunks: "List[GenerateContentResponse]", ) -> "AccumulatedResponse": """Accumulate streaming chunks into a single response-like object.""" accumulated_text = [] finish_reasons = [] tool_calls = [] usage_data = None response_id = None model = None for chunk in chunks: # Extract text and tool calls if getattr(chunk, "candidates", None): for candidate in getattr(chunk, "candidates", []): if hasattr(candidate, "content") and getattr( candidate.content, "parts", [] ): extracted_text = extract_contents_text(candidate.content) if extracted_text: accumulated_text.append(extracted_text) extracted_finish_reasons = extract_finish_reasons(chunk) if extracted_finish_reasons: finish_reasons.extend(extracted_finish_reasons) extracted_tool_calls = extract_tool_calls(chunk) if extracted_tool_calls: tool_calls.extend(extracted_tool_calls) # Use last possible chunk, in case of interruption, and # gracefully handle missing intermediate tokens by taking maximum # with previous token reporting. chunk_usage_data = extract_usage_data(chunk) usage_data = ( chunk_usage_data if usage_data is None else element_wise_usage_max(usage_data, chunk_usage_data) ) accumulated_response = AccumulatedResponse( text="".join(accumulated_text), finish_reasons=finish_reasons, tool_calls=tool_calls, usage_metadata=usage_data, id=response_id, model=model, ) return accumulated_response def set_span_data_for_streaming_response( span: "Union[Span, StreamedSpan]", integration: "Any", accumulated_response: "AccumulatedResponse", ) -> None: """Set span data for accumulated streaming response.""" set_on_span = ( span.set_attribute if isinstance(span, StreamedSpan) else span.set_data ) if ( should_send_default_pii() and integration.include_prompts and accumulated_response.get("text") ): set_on_span( SPANDATA.GEN_AI_RESPONSE_TEXT, safe_serialize([accumulated_response["text"]]), ) if accumulated_response.get("finish_reasons"): set_data_normalized( span, SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS, accumulated_response["finish_reasons"], ) if accumulated_response.get("tool_calls"): set_on_span( SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS, safe_serialize(accumulated_response["tool_calls"]), ) response_id = accumulated_response.get("id") if response_id is not None: set_on_span(SPANDATA.GEN_AI_RESPONSE_ID, response_id) response_model = accumulated_response.get("model") if response_model is not None: set_on_span(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model) if accumulated_response["usage_metadata"] is None: return if accumulated_response["usage_metadata"]["input_tokens"]: set_on_span( SPANDATA.GEN_AI_USAGE_INPUT_TOKENS, accumulated_response["usage_metadata"]["input_tokens"], ) if accumulated_response["usage_metadata"]["input_tokens_cached"]: set_on_span( SPANDATA.GEN_AI_USAGE_INPUT_TOKENS_CACHED, accumulated_response["usage_metadata"]["input_tokens_cached"], ) if accumulated_response["usage_metadata"]["output_tokens"]: set_on_span( SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS, accumulated_response["usage_metadata"]["output_tokens"], ) if accumulated_response["usage_metadata"]["output_tokens_reasoning"]: set_on_span( SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS_REASONING, accumulated_response["usage_metadata"]["output_tokens_reasoning"], ) if accumulated_response["usage_metadata"]["total_tokens"]: set_on_span( SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS, accumulated_response["usage_metadata"]["total_tokens"], )
Save
Close
Exit & Reset
Text mode: syntax highlighting auto-detects file type.
Directory Contents
Dirs: 1 × Files: 4
Delete Selected
Select All
Select None
Sort:
Name
Size
Modified
Enable drag-to-move
Name
Size
Perms
Modified
Actions
__pycache__
DIR
-
drwxr-xr-x
2026-06-11 06:30:31
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
consts.py
559 B
lrw-r--r--
2026-06-11 06:30:31
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
streaming.py
5.51 KB
lrw-r--r--
2026-06-11 06:30:31
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
utils.py
38.03 KB
lrw-r--r--
2026-06-11 06:30:31
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
__init__.py
17.02 KB
lrw-r--r--
2026-06-11 06:30:31
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
Zip Selected
If ZipArchive is unavailable, a
.tar
will be created (no compression).