Viewing file: executing.py (1.98 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
from __future__ import absolute_import
from sentry_sdk import Hub from sentry_sdk._types import MYPY from sentry_sdk.integrations import Integration, DidNotEnable from sentry_sdk.scope import add_global_event_processor from sentry_sdk.utils import walk_exception_chain, iter_stacks
if MYPY: from typing import Optional
from sentry_sdk._types import Event, Hint
try: import executing except ImportError: raise DidNotEnable("executing is not installed")
class ExecutingIntegration(Integration): identifier = "executing"
@staticmethod def setup_once(): # type: () -> None
@add_global_event_processor def add_executing_info(event, hint): # type: (Event, Optional[Hint]) -> Optional[Event] if Hub.current.get_integration(ExecutingIntegration) is None: return event
if hint is None: return event
exc_info = hint.get("exc_info", None)
if exc_info is None: return event
exception = event.get("exception", None)
if exception is None: return event
values = exception.get("values", None)
if values is None: return event
for exception, (_exc_type, _exc_value, exc_tb) in zip( reversed(values), walk_exception_chain(exc_info) ): sentry_frames = [ frame for frame in exception.get("stacktrace", {}).get("frames", []) if frame.get("function") ] tbs = list(iter_stacks(exc_tb)) if len(sentry_frames) != len(tbs): continue
for sentry_frame, tb in zip(sentry_frames, tbs): frame = tb.tb_frame source = executing.Source.for_frame(frame) sentry_frame["function"] = source.code_qualname(frame.f_code)
return event
|