_progress.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. from __future__ import annotations
  2. import io
  3. from typing import Callable
  4. from typing_extensions import override
  5. class CancelledError(Exception):
  6. def __init__(self, msg: str) -> None:
  7. self.msg = msg
  8. super().__init__(msg)
  9. @override
  10. def __str__(self) -> str:
  11. return self.msg
  12. __repr__ = __str__
  13. class BufferReader(io.BytesIO):
  14. def __init__(self, buf: bytes = b"", desc: str | None = None) -> None:
  15. super().__init__(buf)
  16. self._len = len(buf)
  17. self._progress = 0
  18. self._callback = progress(len(buf), desc=desc)
  19. def __len__(self) -> int:
  20. return self._len
  21. @override
  22. def read(self, n: int | None = -1) -> bytes:
  23. chunk = io.BytesIO.read(self, n)
  24. self._progress += len(chunk)
  25. try:
  26. self._callback(self._progress)
  27. except Exception as e: # catches exception from the callback
  28. raise CancelledError("The upload was cancelled: {}".format(e)) from e
  29. return chunk
  30. def progress(total: float, desc: str | None) -> Callable[[float], None]:
  31. import tqdm
  32. meter = tqdm.tqdm(total=total, unit_scale=True, desc=desc)
  33. def incr(progress: float) -> None:
  34. meter.n = progress
  35. if progress == total:
  36. meter.close()
  37. else:
  38. meter.refresh()
  39. return incr
  40. def MB(i: int) -> int:
  41. return int(i // 1024**2)