Ver código fonte

feat(新增银行流水水印去除工具): 添加支持 PDF 和常见图片格式的水印去除功能

zhch158_admin 1 semana atrás
pai
commit
d154ca288c
1 arquivos alterados com 406 adições e 0 exclusões
  1. 406 0
      ocr_tools/remove_watermark_tool/remove_watermark.py

+ 406 - 0
ocr_tools/remove_watermark_tool/remove_watermark.py

@@ -0,0 +1,406 @@
+"""
+银行流水水印去除工具
+
+支持 PDF 和常见图片格式(jpg/png/tif/bmp/webp)。
+- 输入 PDF → 输出去水印 PDF(扫描件)或直接复制(文字型)
+- 输入图片 → 输出去水印图片(保持原格式)
+适用于福建农信、邮储银行等带有半透明文字水印的银行流水单。
+
+用法:
+    # 处理单个 PDF 或图片
+    python remove_watermark.py input.pdf
+    python remove_watermark.py input.jpg
+
+    # 指定输出路径
+    python remove_watermark.py input.pdf -o output.pdf
+
+    # 指定页面范围(支持 "1-5,7,9-12" 格式)
+    python remove_watermark.py input.pdf --page-range 1-3
+
+    # 调整去除阈值(默认 160,范围建议 140-180)
+    python remove_watermark.py input.pdf --threshold 170
+
+    # 批量处理目录下所有 PDF 和图片
+    python remove_watermark.py /path/to/dir/ --batch
+
+    # 预览单页/图片效果(不保存,直接展示对比图)
+    python remove_watermark.py input.pdf --preview --page 0
+    python remove_watermark.py input.jpg --preview
+"""
+import argparse
+import sys
+from pathlib import Path
+from typing import Optional
+
+# 将 ocr_platform 根目录加入 sys.path,以便导入 ocr_utils
+_repo_root = Path(__file__).parents[2]
+if str(_repo_root) not in sys.path:
+    sys.path.insert(0, str(_repo_root))
+
+from loguru import logger
+from ocr_utils.watermark_utils import (
+    detect_watermark,
+    remove_watermark_from_image,
+    scan_pdf_watermark_xobjs,
+    remove_txt_pdf_watermark,
+)
+
+# 支持的图片后缀(小写)
+IMAGE_SUFFIXES = {".jpg", ".jpeg", ".png", ".tif", ".tiff", ".bmp", ".webp"}
+
+
+def _try_remove_txt_pdf_watermark(input_path: Path, output_path: Path) -> int:
+    """
+    对文字型 PDF 执行原生水印去除,保留文字可搜索性。
+
+    内部委托给 watermark_utils.remove_txt_pdf_watermark() 完成内存流处理,
+    有水印时将结果写入 output_path。
+
+    流程:
+    1. scan_pdf_watermark_xobjs() 快速扫描前 3 页,无水印直接返回 0
+    2. remove_txt_pdf_watermark() 执行全量去除,返回 bytes 或 None
+    3. 有水印时写 output_path
+
+    Returns:
+        1 表示去除成功,0 表示未发现水印
+    """
+    pdf_bytes = input_path.read_bytes()
+
+    if not scan_pdf_watermark_xobjs(pdf_bytes, sample_pages=3):
+        return 0
+
+    cleaned = remove_txt_pdf_watermark(pdf_bytes)
+    if cleaned is None:
+        return 0
+
+    output_path.write_bytes(cleaned)
+    return 1
+
+
+
+def process_document(
+    input_path: Path,
+    output_path: Path,
+    threshold: int = 160,
+    morph_close_kernel: int = 0,
+    dpi: int = 200,
+    page_range: Optional[str] = None,
+    force_image: bool = False,
+) -> int:
+    """
+    统一处理函数:支持 PDF(扫描件)和图片,去除水印后保存。
+
+    使用 PDFUtils.load_and_classify_document 加载并分类:
+    - 文字型 PDF(pdf_type='txt'):优先尝试原生 XObject 水印去除(保留可搜索性);
+      失败时自动回退图像化处理,或 force_image=True 时直接走图像处理
+    - 扫描件 PDF(pdf_type='ocr'):逐页去水印后重新打包为 PDF
+    - 图片:检测水印后去除并保存
+
+    Args:
+        input_path: 输入文件路径(PDF 或图片)
+        output_path: 输出文件路径
+        threshold: 灰度阈值(140-180),越大保守,越小激进
+        morph_close_kernel: 形态学闭运算核大小,0 跳过
+        dpi: PDF 渲染分辨率
+        page_range: 页面范围字符串,如 "1-5,7,9-12"(从 1 开始,仅对 PDF 有效)
+        force_image: 强制对文字型 PDF 使用图像化处理(会失去文字可搜索性,
+                     但能处理水印嵌在内容流中的情况)
+
+    Returns:
+        实际处理的页/图片数
+    """
+    import shutil
+    import numpy as np
+    from io import BytesIO
+    from PIL import Image
+    from ocr_utils.pdf_utils import PDFUtils
+
+    is_pdf = input_path.suffix.lower() == ".pdf"
+
+    # 统一加载 + 分类(PDF 用 MinerU pdf_classify,图片直接读取)
+    images, pdf_type, pdf_doc, renderer = PDFUtils.load_and_classify_document(
+        input_path, dpi=dpi, page_range=page_range
+    )
+
+    # _known_has_wm: 当 txt 分支已确认有水印时设为 True,避免公共段用更严格阈值误判
+    _known_has_wm: Optional[bool] = None
+
+    # 文字型 PDF:优先尝试原生 XObject 水印去除,保留可搜索性
+    if is_pdf and pdf_type == "txt" and not force_image:
+        output_path.parent.mkdir(parents=True, exist_ok=True)
+        removed = _try_remove_txt_pdf_watermark(input_path, output_path)
+        if removed > 0:
+            logger.info(
+                f"✅ 文字型 PDF '{input_path.name}':删除 {removed} 个水印 XObject,"
+                "保留文字可搜索性,已保存。"
+            )
+            return removed
+
+        # XObject 扫描无结果,用较低阈值(0.5%)做图像水印检测二次确认
+        # 文字 PDF 背景干净,降低阈值以检测稀疏文字水印
+        first_np = np.array(images[0]["img_pil"])
+        if detect_watermark(first_np, ratio_threshold=0.005):
+            logger.warning(
+                f"⚠️  文字型 PDF '{input_path.name}':未找到 XObject 水印,"
+                "但图像检测发现水印(内联内容流水印),"
+                "回退为图像化处理(输出将失去文字可搜索性)。"
+            )
+            _known_has_wm = True  # 明确检测到水印,跳过公共段二次检测
+        else:
+            logger.info(
+                f"✅ 文字型 PDF '{input_path.name}':未检测到水印,直接复制。"
+            )
+            shutil.copy2(str(input_path), str(output_path))
+            return 0
+    elif is_pdf and pdf_type == "txt" and force_image:
+        logger.warning(
+            f"⚠️  文字型 PDF '{input_path.name}':--force-image 模式,"
+            "强制图像化处理(输出将失去文字可搜索性)。"
+        )
+        _known_has_wm = True  # force_image 模式不再检测,直接去除
+
+    logger.info(
+        f"{'📄' if is_pdf else '🖼️ '} 处理: {input_path.name}  "
+        f"共 {len(images)} {'页' if is_pdf else '张'}  threshold={threshold}"
+    )
+
+    # 水印检测(仅用第一页/图判断,同一文档水印通常一致)
+    # _known_has_wm 已在 txt 分支设置时,跳过重复检测
+    if _known_has_wm is not None:
+        has_wm = _known_has_wm
+        logger.info("🔍 检测到水印,启动去水印处理" if has_wm else "✅ 未检测到水印,跳过")
+    else:
+        first_np = np.array(images[0]["img_pil"])
+        # 扫描件/图片路径:使用宽松一档的中间调阈值(2.5%)以避免边界误判,
+        # 斜向直线验证仍作为双重保险防止误报
+        has_wm = detect_watermark(first_np, ratio_threshold=0.025)
+        if has_wm:
+            logger.info("🔍 检测到水印,启动去水印处理")
+        else:
+            logger.info("✅ 未检测到水印,跳过去水印处理")
+            if not is_pdf:
+                # 图片无水印:直接复制
+                output_path.parent.mkdir(parents=True, exist_ok=True)
+                shutil.copy2(str(input_path), str(output_path))
+                return 1
+
+    output_path.parent.mkdir(parents=True, exist_ok=True)
+
+    if is_pdf:
+        # 逐页处理后重新打包为 PDF
+        try:
+            import fitz
+        except ImportError:
+            raise ImportError("请安装 PyMuPDF: pip install PyMuPDF")
+
+        new_doc = fitz.open()
+        for i, img_dict in enumerate(images):
+            pil_img = img_dict["img_pil"]
+            img_np = np.array(pil_img)
+
+            if has_wm:
+                cleaned_gray = remove_watermark_from_image(
+                    img_np, threshold=threshold,
+                    morph_close_kernel=morph_close_kernel, return_pil=False,
+                )
+                out_pil = Image.fromarray(cleaned_gray).convert("RGB")
+            else:
+                out_pil = pil_img
+
+            buf = BytesIO()
+            out_pil.save(buf, format="PNG", optimize=False)
+            buf.seek(0)
+
+            # 按渲染图尺寸创建新页面(保持原始 DPI 尺寸)
+            w_px, h_px = out_pil.size
+            new_page = new_doc.new_page(width=w_px * 72 / dpi, height=h_px * 72 / dpi)
+            new_page.insert_image(new_page.rect, stream=buf.read())
+
+            if (i + 1) % 10 == 0 or i == len(images) - 1:
+                logger.info(f"  进度: {i + 1}/{len(images)}")
+
+        new_doc.save(str(output_path), garbage=4, deflate=True)
+    else:
+        # 图片:有水印则去除后保存
+        img_np = np.array(images[0]["img_pil"])
+        cleaned_gray = remove_watermark_from_image(
+            img_np, threshold=threshold,
+            morph_close_kernel=morph_close_kernel, return_pil=False,
+        )
+        Image.fromarray(cleaned_gray, mode="L").save(str(output_path))
+
+    logger.info(f"✅ 保存到: {output_path}")
+    return len(images)
+
+
+def preview_page(
+    input_path: Path,
+    page_idx: int = 0,
+    threshold: int = 160,
+    dpi: int = 200,
+):
+    """展示单页原图与去水印对比(需要 matplotlib)。支持 PDF 和图片文件。"""
+    try:
+        import numpy as np
+        import matplotlib.pyplot as plt
+        import matplotlib
+        matplotlib.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'SimHei', 'sans-serif']
+        matplotlib.rcParams['axes.unicode_minus'] = False
+    except ImportError as e:
+        raise ImportError(f"预览需要 matplotlib: {e}")
+
+    suffix = input_path.suffix.lower()
+
+    if suffix == ".pdf":
+        try:
+            import fitz
+        except ImportError:
+            raise ImportError("PDF 预览需要 PyMuPDF: pip install PyMuPDF")
+        doc = fitz.open(str(input_path))
+        if page_idx >= len(doc):
+            raise ValueError(f"页码 {page_idx} 超出范围(共 {len(doc)} 页)")
+        mat = fitz.Matrix(dpi / 72, dpi / 72)
+        page = doc[page_idx]
+        pix = page.get_pixmap(matrix=mat, alpha=False)
+        img_np = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.h, pix.w, 3)
+        title_orig = f"原图  第 {page_idx + 1} 页"
+    elif suffix in IMAGE_SUFFIXES:
+        from PIL import Image
+        img_np = np.array(Image.open(str(input_path)).convert("RGB"))
+        title_orig = f"原图  {input_path.name}"
+    else:
+        raise ValueError(f"不支持的文件格式: {suffix}")
+
+    cleaned = remove_watermark_from_image(img_np, threshold=threshold, return_pil=False)
+
+    fig, axes = plt.subplots(1, 2, figsize=(20, 14))
+    axes[0].imshow(img_np)
+    axes[0].set_title(title_orig, fontsize=14)
+    axes[0].axis('off')
+
+    axes[1].imshow(cleaned, cmap='gray')
+    axes[1].set_title(f"去水印后  threshold={threshold}", fontsize=14)
+    axes[1].axis('off')
+
+    plt.tight_layout()
+    plt.show()
+
+
+def main():
+    parser = argparse.ArgumentParser(
+        description="银行流水水印去除工具",
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+        epilog=__doc__,
+    )
+    parser.add_argument("input", type=Path, help="输入 PDF / 图片文件或目录(批量模式)")
+    parser.add_argument("-o", "--output", type=Path, default=None,
+                        help="输出路径(单文件模式;默认在原文件名后加 _cleaned)")
+    parser.add_argument("--threshold", type=int, default=160,
+                        help="灰度阈值 (140-180),默认 160")
+    parser.add_argument("--morph-kernel", type=int, default=2,
+                        help="形态学闭运算核大小,0 跳过,默认 2")
+    parser.add_argument("--dpi", type=int, default=200,
+                        help="渲染 DPI,默认 200")
+    parser.add_argument("--batch", action="store_true",
+                        help="批量模式:处理目录下所有 PDF 和图片")
+    parser.add_argument("--preview", action="store_true",
+                        help="预览模式:展示单页对比图(不保存)")
+    parser.add_argument("--page", type=int, default=0,
+                        help="预览页码(0-based),默认第 0 页")
+    parser.add_argument("--page-range", type=str, default=None,
+                        help="处理页面范围,如 '1-3,5,7-9'(从 1 开始,仅对 PDF 有效)")
+    parser.add_argument("--force-image", action="store_true",
+                        help="强制对文字型 PDF 使用图像化处理(会失去可搜索性,适用于 XObject 方法无法去除的内联水印)")
+
+    args = parser.parse_args()
+
+    if args.preview:
+        preview_page(
+            args.input,
+            page_idx=args.page,
+            threshold=args.threshold,
+            dpi=args.dpi,
+        )
+        return
+
+    if args.batch:
+        # 批量模式:处理目录下所有 PDF 和图片
+        input_dir = args.input
+        if not input_dir.is_dir():
+            logger.error(f"批量模式需要传入目录: {input_dir}")
+            sys.exit(1)
+
+        # 收集所有支持的文件
+        all_files: list[Path] = sorted(input_dir.glob("*.pdf"))
+        for ext in IMAGE_SUFFIXES:
+            all_files.extend(sorted(input_dir.glob(f"*{ext}")))
+            all_files.extend(sorted(input_dir.glob(f"*{ext.upper()}")))
+        all_files = sorted(set(all_files))
+
+        if not all_files:
+            logger.warning(f"目录中没有可处理的文件(PDF/图片): {input_dir}")
+            return
+        out_dir = args.output or input_dir / "cleaned"
+        out_dir.mkdir(parents=True, exist_ok=True)
+        for file in all_files:
+            out_file = out_dir / f"{file.stem}_cleaned{file.suffix}"
+            try:
+                process_document(file, out_file, args.threshold, args.morph_kernel, args.dpi, args.page_range, args.force_image)
+            except Exception as e:
+                logger.error(f"❌ 处理失败 {file.name}: {e}")
+        logger.info(f"✅ 批量处理完成,共 {len(all_files)} 个文件 -> {out_dir}")
+    else:
+        # 单文件模式
+        input_path = args.input
+        if not input_path.is_file():
+            logger.error(f"文件不存在: {input_path}")
+            sys.exit(1)
+        output_path = args.output or input_path.with_name(
+            f"{input_path.stem}_cleaned{input_path.suffix}"
+        )
+        suffix = input_path.suffix.lower()
+        if suffix == ".pdf" or suffix in IMAGE_SUFFIXES:
+            process_document(input_path, output_path, args.threshold, args.morph_kernel, args.dpi, args.page_range, args.force_image)
+        else:
+            logger.error(f"不支持的文件格式: {suffix},支持 PDF 和 {IMAGE_SUFFIXES}")
+            sys.exit(1)
+
+
+if __name__ == "__main__":
+    if len(sys.argv) == 1:
+        print("ℹ️  未提供命令行参数,使用默认配置运行...")
+
+        # 默认配置(用于开发测试)
+        default_config = {
+            # 测试输入
+            # "input": "/Users/zhch158/workspace/data/流水分析/杨万益_福建农信.pdf",
+            # "input": "Users/zhch158/workspace/data/流水分析/提取自杨万益_福建农信.png",
+            
+            # 文字PDF测试
+            # "input": "/Users/zhch158/workspace/data/流水分析/提取自赤峰黄金2023年报.pdf",
+            # "input": "/Users/zhch158/workspace/data/测试文字PDF-水印.pdf",
+            "input": "/Users/zhch158/workspace/data/非结构化文档识别统一平台(ocr_platform)-交易流水识别,财报识别.pdf",
+            # "output": "./output/杨万益_福建农信",
+            # 页面范围(可选,支持 "1-5,7" 语法,仅对 PDF 有效)
+            # "page_range": "3",  # 仅处理第 1 页(对应 --page-range 参数)
+            "dpi": 200,
+            "threshold": 160,
+            "morph_kernel": 0,  # 遮罩替换模式下不需要闭运算
+            # "preview": True,
+        }
+
+        # 构造参数(注意 input 是位置参数,morph_kernel 对应 --morph-kernel)
+        sys.argv = [sys.argv[0], default_config["input"]]
+        skip_keys = {"input"}
+        for key, value in default_config.items():
+            if key in skip_keys:
+                continue
+            # 将下划线转换为连字符(如 morph_kernel -> morph-kernel)
+            flag = f"--{key.replace('_', '-')}"
+            if isinstance(value, bool):
+                if value:
+                    sys.argv.append(flag)
+            else:
+                sys.argv.extend([flag, str(value)])
+
+    sys.exit(main())