| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602 |
- # traceback_exception_init() adapted from trio
- #
- # _ExceptionPrintContext and traceback_exception_format() copied from the standard
- # library
- from __future__ import annotations
- import collections.abc
- import sys
- import textwrap
- import traceback
- from functools import singledispatch
- from types import TracebackType
- from typing import Any, List, Optional
- from ._exceptions import BaseExceptionGroup
- max_group_width = 15
- max_group_depth = 10
- _cause_message = (
- "\nThe above exception was the direct cause of the following exception:\n\n"
- )
- _context_message = (
- "\nDuring handling of the above exception, another exception occurred:\n\n"
- )
- def _format_final_exc_line(etype, value):
- valuestr = _safe_string(value, "exception")
- if value is None or not valuestr:
- line = f"{etype}\n"
- else:
- line = f"{etype}: {valuestr}\n"
- return line
- def _safe_string(value, what, func=str):
- try:
- return func(value)
- except BaseException:
- return f"<{what} {func.__name__}() failed>"
- class _ExceptionPrintContext:
- def __init__(self):
- self.seen = set()
- self.exception_group_depth = 0
- self.need_close = False
- def indent(self):
- return " " * (2 * self.exception_group_depth)
- def emit(self, text_gen, margin_char=None):
- if margin_char is None:
- margin_char = "|"
- indent_str = self.indent()
- if self.exception_group_depth:
- indent_str += margin_char + " "
- if isinstance(text_gen, str):
- yield textwrap.indent(text_gen, indent_str, lambda line: True)
- else:
- for text in text_gen:
- yield textwrap.indent(text, indent_str, lambda line: True)
- def exceptiongroup_excepthook(
- etype: type[BaseException], value: BaseException, tb: TracebackType | None
- ) -> None:
- sys.stderr.write("".join(traceback.format_exception(etype, value, tb)))
- class PatchedTracebackException(traceback.TracebackException):
- def __init__(
- self,
- exc_type: type[BaseException],
- exc_value: BaseException,
- exc_traceback: TracebackType | None,
- *,
- limit: int | None = None,
- lookup_lines: bool = True,
- capture_locals: bool = False,
- compact: bool = False,
- _seen: set[int] | None = None,
- ) -> None:
- kwargs: dict[str, Any] = {}
- if sys.version_info >= (3, 10):
- kwargs["compact"] = compact
- is_recursive_call = _seen is not None
- if _seen is None:
- _seen = set()
- _seen.add(id(exc_value))
- self.stack = traceback.StackSummary.extract(
- traceback.walk_tb(exc_traceback),
- limit=limit,
- lookup_lines=lookup_lines,
- capture_locals=capture_locals,
- )
- self.exc_type = exc_type
- # Capture now to permit freeing resources: only complication is in the
- # unofficial API _format_final_exc_line
- self._str = _safe_string(exc_value, "exception")
- try:
- self.__notes__ = getattr(exc_value, "__notes__", None)
- except KeyError:
- # Workaround for https://github.com/python/cpython/issues/98778 on Python
- # <= 3.9, and some 3.10 and 3.11 patch versions.
- HTTPError = getattr(sys.modules.get("urllib.error", None), "HTTPError", ())
- if sys.version_info[:2] <= (3, 11) and isinstance(exc_value, HTTPError):
- self.__notes__ = None
- else:
- raise
- if exc_type and issubclass(exc_type, SyntaxError):
- # Handle SyntaxError's specially
- self.filename = exc_value.filename
- lno = exc_value.lineno
- self.lineno = str(lno) if lno is not None else None
- self.text = exc_value.text
- self.offset = exc_value.offset
- self.msg = exc_value.msg
- if sys.version_info >= (3, 10):
- end_lno = exc_value.end_lineno
- self.end_lineno = str(end_lno) if end_lno is not None else None
- self.end_offset = exc_value.end_offset
- elif (
- exc_type
- and issubclass(exc_type, (NameError, AttributeError))
- and getattr(exc_value, "name", None) is not None
- ):
- suggestion = _compute_suggestion_error(exc_value, exc_traceback)
- if suggestion:
- self._str += f". Did you mean: '{suggestion}'?"
- if lookup_lines:
- # Force all lines in the stack to be loaded
- for frame in self.stack:
- frame.line
- self.__suppress_context__ = (
- exc_value.__suppress_context__ if exc_value is not None else False
- )
- # Convert __cause__ and __context__ to `TracebackExceptions`s, use a
- # queue to avoid recursion (only the top-level call gets _seen == None)
- if not is_recursive_call:
- queue = [(self, exc_value)]
- while queue:
- te, e = queue.pop()
- if e and e.__cause__ is not None and id(e.__cause__) not in _seen:
- cause = PatchedTracebackException(
- type(e.__cause__),
- e.__cause__,
- e.__cause__.__traceback__,
- limit=limit,
- lookup_lines=lookup_lines,
- capture_locals=capture_locals,
- _seen=_seen,
- )
- else:
- cause = None
- if compact:
- need_context = (
- cause is None and e is not None and not e.__suppress_context__
- )
- else:
- need_context = True
- if (
- e
- and e.__context__ is not None
- and need_context
- and id(e.__context__) not in _seen
- ):
- context = PatchedTracebackException(
- type(e.__context__),
- e.__context__,
- e.__context__.__traceback__,
- limit=limit,
- lookup_lines=lookup_lines,
- capture_locals=capture_locals,
- _seen=_seen,
- )
- else:
- context = None
- # Capture each of the exceptions in the ExceptionGroup along with each
- # of their causes and contexts
- if e and isinstance(e, BaseExceptionGroup):
- exceptions = []
- for exc in e.exceptions:
- texc = PatchedTracebackException(
- type(exc),
- exc,
- exc.__traceback__,
- lookup_lines=lookup_lines,
- capture_locals=capture_locals,
- _seen=_seen,
- )
- exceptions.append(texc)
- else:
- exceptions = None
- te.__cause__ = cause
- te.__context__ = context
- te.exceptions = exceptions
- if cause:
- queue.append((te.__cause__, e.__cause__))
- if context:
- queue.append((te.__context__, e.__context__))
- if exceptions:
- queue.extend(zip(te.exceptions, e.exceptions))
- def format(self, *, chain=True, _ctx=None, **kwargs):
- if _ctx is None:
- _ctx = _ExceptionPrintContext()
- output = []
- exc = self
- if chain:
- while exc:
- if exc.__cause__ is not None:
- chained_msg = _cause_message
- chained_exc = exc.__cause__
- elif exc.__context__ is not None and not exc.__suppress_context__:
- chained_msg = _context_message
- chained_exc = exc.__context__
- else:
- chained_msg = None
- chained_exc = None
- output.append((chained_msg, exc))
- exc = chained_exc
- else:
- output.append((None, exc))
- for msg, exc in reversed(output):
- if msg is not None:
- yield from _ctx.emit(msg)
- if getattr(exc, "exceptions", None) is None:
- if exc.stack:
- yield from _ctx.emit("Traceback (most recent call last):\n")
- yield from _ctx.emit(exc.stack.format())
- yield from _ctx.emit(exc.format_exception_only())
- elif _ctx.exception_group_depth > max_group_depth:
- # exception group, but depth exceeds limit
- yield from _ctx.emit(f"... (max_group_depth is {max_group_depth})\n")
- else:
- # format exception group
- is_toplevel = _ctx.exception_group_depth == 0
- if is_toplevel:
- _ctx.exception_group_depth += 1
- if exc.stack:
- yield from _ctx.emit(
- "Exception Group Traceback (most recent call last):\n",
- margin_char="+" if is_toplevel else None,
- )
- yield from _ctx.emit(exc.stack.format())
- yield from _ctx.emit(exc.format_exception_only())
- num_excs = len(exc.exceptions)
- if num_excs <= max_group_width:
- n = num_excs
- else:
- n = max_group_width + 1
- _ctx.need_close = False
- for i in range(n):
- last_exc = i == n - 1
- if last_exc:
- # The closing frame may be added by a recursive call
- _ctx.need_close = True
- if max_group_width is not None:
- truncated = i >= max_group_width
- else:
- truncated = False
- title = f"{i + 1}" if not truncated else "..."
- yield (
- _ctx.indent()
- + ("+-" if i == 0 else " ")
- + f"+---------------- {title} ----------------\n"
- )
- _ctx.exception_group_depth += 1
- if not truncated:
- yield from exc.exceptions[i].format(chain=chain, _ctx=_ctx)
- else:
- remaining = num_excs - max_group_width
- plural = "s" if remaining > 1 else ""
- yield from _ctx.emit(
- f"and {remaining} more exception{plural}\n"
- )
- if last_exc and _ctx.need_close:
- yield _ctx.indent() + "+------------------------------------\n"
- _ctx.need_close = False
- _ctx.exception_group_depth -= 1
- if is_toplevel:
- assert _ctx.exception_group_depth == 1
- _ctx.exception_group_depth = 0
- def format_exception_only(self, **kwargs):
- """Format the exception part of the traceback.
- The return value is a generator of strings, each ending in a newline.
- Normally, the generator emits a single string; however, for
- SyntaxError exceptions, it emits several lines that (when
- printed) display detailed information about where the syntax
- error occurred.
- The message indicating which exception occurred is always the last
- string in the output.
- """
- if self.exc_type is None:
- yield traceback._format_final_exc_line(None, self._str)
- return
- stype = self.exc_type.__qualname__
- smod = self.exc_type.__module__
- if smod not in ("__main__", "builtins"):
- if not isinstance(smod, str):
- smod = "<unknown>"
- stype = smod + "." + stype
- if not issubclass(self.exc_type, SyntaxError):
- yield _format_final_exc_line(stype, self._str)
- elif traceback_exception_format_syntax_error is not None:
- yield from traceback_exception_format_syntax_error(self, stype)
- else:
- yield from traceback_exception_original_format_exception_only(self)
- notes = getattr(self, "__notes__", None)
- if isinstance(notes, collections.abc.Sequence):
- for note in notes:
- note = _safe_string(note, "note")
- yield from [line + "\n" for line in note.split("\n")]
- elif notes is not None:
- yield _safe_string(notes, "__notes__", func=repr)
- traceback_exception_original_format = traceback.TracebackException.format
- traceback_exception_original_format_exception_only = (
- traceback.TracebackException.format_exception_only
- )
- traceback_exception_format_syntax_error = getattr(
- traceback.TracebackException, "_format_syntax_error", None
- )
- if sys.excepthook is sys.__excepthook__:
- traceback.TracebackException.__init__ = ( # type: ignore[assignment]
- PatchedTracebackException.__init__
- )
- traceback.TracebackException.format = ( # type: ignore[assignment]
- PatchedTracebackException.format
- )
- traceback.TracebackException.format_exception_only = ( # type: ignore[assignment]
- PatchedTracebackException.format_exception_only
- )
- sys.excepthook = exceptiongroup_excepthook
- # Ubuntu's system Python has a sitecustomize.py file that imports
- # apport_python_hook and replaces sys.excepthook.
- #
- # The custom hook captures the error for crash reporting, and then calls
- # sys.__excepthook__ to actually print the error.
- #
- # We don't mind it capturing the error for crash reporting, but we want to
- # take over printing the error. So we monkeypatch the apport_python_hook
- # module so that instead of calling sys.__excepthook__, it calls our custom
- # hook.
- #
- # More details: https://github.com/python-trio/trio/issues/1065
- if getattr(sys.excepthook, "__name__", None) in (
- "apport_excepthook",
- # on ubuntu 22.10 the hook was renamed to partial_apport_excepthook
- "partial_apport_excepthook",
- ):
- # patch traceback like above
- traceback.TracebackException.__init__ = ( # type: ignore[assignment]
- PatchedTracebackException.__init__
- )
- traceback.TracebackException.format = ( # type: ignore[assignment]
- PatchedTracebackException.format
- )
- traceback.TracebackException.format_exception_only = ( # type: ignore[assignment]
- PatchedTracebackException.format_exception_only
- )
- from types import ModuleType
- import apport_python_hook
- # monkeypatch the sys module that apport has imported
- fake_sys = ModuleType("exceptiongroup_fake_sys")
- fake_sys.__dict__.update(sys.__dict__)
- fake_sys.__excepthook__ = exceptiongroup_excepthook
- apport_python_hook.sys = fake_sys
- @singledispatch
- def format_exception_only(__exc: BaseException, **kwargs: Any) -> List[str]:
- return list(
- PatchedTracebackException(
- type(__exc), __exc, None, compact=True
- ).format_exception_only()
- )
- @format_exception_only.register
- def _(__exc: type, value: BaseException, **kwargs: Any) -> List[str]:
- return format_exception_only(value)
- @singledispatch
- def format_exception(
- __exc: BaseException, limit: Optional[int] = None, chain: bool = True, **kwargs: Any
- ) -> List[str]:
- return list(
- PatchedTracebackException(
- type(__exc), __exc, __exc.__traceback__, limit=limit, compact=True
- ).format(chain=chain)
- )
- @format_exception.register
- def _(
- __exc: type,
- value: BaseException,
- tb: TracebackType,
- limit: Optional[int] = None,
- chain: bool = True,
- **kwargs: Any,
- ) -> List[str]:
- return format_exception(value, limit, chain)
- @singledispatch
- def print_exception(
- __exc: BaseException,
- limit: Optional[int] = None,
- file: Any = None,
- chain: bool = True,
- **kwargs: Any,
- ) -> None:
- if file is None:
- file = sys.stderr
- for line in PatchedTracebackException(
- type(__exc), __exc, __exc.__traceback__, limit=limit
- ).format(chain=chain):
- print(line, file=file, end="")
- @print_exception.register
- def _(
- __exc: type,
- value: BaseException,
- tb: TracebackType,
- limit: Optional[int] = None,
- file: Any = None,
- chain: bool = True,
- ) -> None:
- print_exception(value, limit, file, chain)
- def print_exc(
- limit: Optional[int] = None,
- file: Any | None = None,
- chain: bool = True,
- ) -> None:
- value = sys.exc_info()[1]
- print_exception(value, limit, file, chain)
- # Python levenshtein edit distance code for NameError/AttributeError
- # suggestions, backported from 3.12
- _MAX_CANDIDATE_ITEMS = 750
- _MAX_STRING_SIZE = 40
- _MOVE_COST = 2
- _CASE_COST = 1
- _SENTINEL = object()
- def _substitution_cost(ch_a, ch_b):
- if ch_a == ch_b:
- return 0
- if ch_a.lower() == ch_b.lower():
- return _CASE_COST
- return _MOVE_COST
- def _compute_suggestion_error(exc_value, tb):
- wrong_name = getattr(exc_value, "name", None)
- if wrong_name is None or not isinstance(wrong_name, str):
- return None
- if isinstance(exc_value, AttributeError):
- obj = getattr(exc_value, "obj", _SENTINEL)
- if obj is _SENTINEL:
- return None
- obj = exc_value.obj
- try:
- d = dir(obj)
- except Exception:
- return None
- else:
- assert isinstance(exc_value, NameError)
- # find most recent frame
- if tb is None:
- return None
- while tb.tb_next is not None:
- tb = tb.tb_next
- frame = tb.tb_frame
- d = list(frame.f_locals) + list(frame.f_globals) + list(frame.f_builtins)
- if len(d) > _MAX_CANDIDATE_ITEMS:
- return None
- wrong_name_len = len(wrong_name)
- if wrong_name_len > _MAX_STRING_SIZE:
- return None
- best_distance = wrong_name_len
- suggestion = None
- for possible_name in d:
- if possible_name == wrong_name:
- # A missing attribute is "found". Don't suggest it (see GH-88821).
- continue
- # No more than 1/3 of the involved characters should need changed.
- max_distance = (len(possible_name) + wrong_name_len + 3) * _MOVE_COST // 6
- # Don't take matches we've already beaten.
- max_distance = min(max_distance, best_distance - 1)
- current_distance = _levenshtein_distance(
- wrong_name, possible_name, max_distance
- )
- if current_distance > max_distance:
- continue
- if not suggestion or current_distance < best_distance:
- suggestion = possible_name
- best_distance = current_distance
- return suggestion
- def _levenshtein_distance(a, b, max_cost):
- # A Python implementation of Python/suggestions.c:levenshtein_distance.
- # Both strings are the same
- if a == b:
- return 0
- # Trim away common affixes
- pre = 0
- while a[pre:] and b[pre:] and a[pre] == b[pre]:
- pre += 1
- a = a[pre:]
- b = b[pre:]
- post = 0
- while a[: post or None] and b[: post or None] and a[post - 1] == b[post - 1]:
- post -= 1
- a = a[: post or None]
- b = b[: post or None]
- if not a or not b:
- return _MOVE_COST * (len(a) + len(b))
- if len(a) > _MAX_STRING_SIZE or len(b) > _MAX_STRING_SIZE:
- return max_cost + 1
- # Prefer shorter buffer
- if len(b) < len(a):
- a, b = b, a
- # Quick fail when a match is impossible
- if (len(b) - len(a)) * _MOVE_COST > max_cost:
- return max_cost + 1
- # Instead of producing the whole traditional len(a)-by-len(b)
- # matrix, we can update just one row in place.
- # Initialize the buffer row
- row = list(range(_MOVE_COST, _MOVE_COST * (len(a) + 1), _MOVE_COST))
- result = 0
- for bindex in range(len(b)):
- bchar = b[bindex]
- distance = result = bindex * _MOVE_COST
- minimum = sys.maxsize
- for index in range(len(a)):
- # 1) Previous distance in this row is cost(b[:b_index], a[:index])
- substitute = distance + _substitution_cost(bchar, a[index])
- # 2) cost(b[:b_index], a[:index+1]) from previous row
- distance = row[index]
- # 3) existing result is cost(b[:b_index+1], a[index])
- insert_delete = min(result, distance) + _MOVE_COST
- result = min(insert_delete, substitute)
- # cost(b[:b_index+1], a[:index+1])
- row[index] = result
- if result < minimum:
- minimum = result
- if minimum > max_cost:
- # Everything in this row is too big, so bail early.
- return max_cost + 1
- return result
|