file_extract.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. from __future__ import annotations
  2. import uuid
  3. from dataclasses import dataclass
  4. @dataclass
  5. class ExtractedFile:
  6. doc_id: str
  7. source_label: str
  8. text: str
  9. warning: str | None = None
  10. def _safe_decode(data: bytes) -> str:
  11. for enc in ("utf-8", "utf-8-sig", "gbk"):
  12. try:
  13. return data.decode(enc)
  14. except UnicodeDecodeError:
  15. continue
  16. return data.decode("utf-8", errors="replace")
  17. def extract_text_from_upload(*, filename: str, data: bytes) -> ExtractedFile:
  18. name = (filename or "upload").strip() or "upload"
  19. lower = name.lower()
  20. doc_id = f"file-{uuid.uuid4().hex[:12]}"
  21. if lower.endswith(".pdf"):
  22. try:
  23. from io import BytesIO
  24. from pypdf import PdfReader
  25. reader = PdfReader(BytesIO(data))
  26. parts: list[str] = []
  27. for page in reader.pages:
  28. t = page.extract_text()
  29. if t:
  30. parts.append(t)
  31. text = "\n".join(parts).strip()
  32. warn = None
  33. if not text:
  34. warn = "PDF 未解析出文本(可能为扫描件,需先 OCR)"
  35. return ExtractedFile(
  36. doc_id=doc_id, source_label=name, text=text or "", warning=warn
  37. )
  38. except Exception as e: # noqa: BLE001
  39. return ExtractedFile(
  40. doc_id=doc_id,
  41. source_label=name,
  42. text="",
  43. warning=f"PDF 解析失败: {e}",
  44. )
  45. text = _safe_decode(data).strip()
  46. return ExtractedFile(doc_id=doc_id, source_label=name, text=text)