| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463 |
- """使用 ONNX Runtime 进行布局检测的统一接口"""
- import cv2
- import numpy as np
- import onnxruntime as ort
- from pathlib import Path
- from typing import Dict, List, Tuple
- class LayoutDetectorONNX:
- """布局检测器 ONNX 版本"""
-
- # ⚠️ 修正:使用官方的 RT-DETR-H_layout_17cls 类别定义
- CATEGORY_NAMES = {
- 0: 'paragraph_title',
- 1: 'image',
- 2: 'text',
- 3: 'number',
- 4: 'abstract',
- 5: 'content',
- 6: 'figure_title',
- 7: 'formula',
- 8: 'table',
- 9: 'table_title',
- 10: 'reference',
- 11: 'doc_title',
- 12: 'footnote',
- 13: 'header',
- 14: 'algorithm',
- 15: 'footer',
- 16: 'seal'
- }
-
- def __init__(self, onnx_path: str, use_gpu: bool = False):
- """
- 初始化 ONNX 模型
-
- Args:
- onnx_path: ONNX 模型路径
- use_gpu: 是否使用 GPU(Mac 不支持 CUDA)
- """
- # Mac 只支持 CPU 或 CoreML
- if use_gpu:
- providers = ['CoreMLExecutionProvider', 'CPUExecutionProvider']
- else:
- providers = ['CPUExecutionProvider']
-
- self.session = ort.InferenceSession(onnx_path, providers=providers)
-
- # 获取模型输入信息
- self.inputs = {inp.name: inp for inp in self.session.get_inputs()}
- self.outputs = {out.name: out for out in self.session.get_outputs()}
-
- print(f"📋 Model inputs: {list(self.inputs.keys())}")
- print(f"📋 Model outputs: {list(self.outputs.keys())}")
-
- # 自动检测输入尺寸
- self.target_size = self._detect_input_size()
- print(f"🎯 Detected target size: {self.target_size}")
-
- # 检查输入形状
- for name, inp in self.inputs.items():
- print(f" - {name}: shape={inp.shape}, dtype={inp.type}")
-
- def _detect_input_size(self) -> int:
- """自动检测模型的输入尺寸"""
- if 'image' in self.inputs:
- shape = self.inputs['image'].shape
- # shape 通常是 [batch, channels, height, width]
- if len(shape) >= 3:
- # 尝试从 shape[2] 或 shape[3] 获取尺寸
- for dim in shape[2:]:
- if isinstance(dim, int) and dim > 0:
- return dim
-
- # 默认值
- return 640
-
- def preprocess(
- self,
- img: np.ndarray
- ) -> Tuple[Dict[str, np.ndarray], float, Tuple[int, int]]:
- """
- 预处理图像 (根据 inference.yml 配置)
-
- Args:
- img: BGR 格式的输入图像
-
- Returns:
- input_dict: 包含所有输入的字典
- scale_factor: 缩放因子 (用于后处理)
- orig_shape: 原始图像尺寸 (h, w)
- """
- orig_h, orig_w = img.shape[:2]
- target_size = self.target_size # 640
-
- # ✅ 修正 1: 直接 resize 到目标尺寸,不保持长宽比 (keep_ratio: false)
- img_resized = cv2.resize(
- img,
- (target_size, target_size),
- interpolation=cv2.INTER_LINEAR # interp: 2
- )
-
- # ✅ 修正 2: 转换为 RGB
- img_rgb = cv2.cvtColor(img_resized, cv2.COLOR_BGR2RGB)
-
- # ✅ 修正 3: 归一化 (mean=[0,0,0], std=[1,1,1], norm_type=none)
- # 只做 /255,不做均值减法和标准差除法
- img_normalized = img_rgb.astype(np.float32) / 255.0
-
- # 4. 转换为 CHW 格式
- img_chw = img_normalized.transpose(2, 0, 1)
- img_tensor = img_chw[None, ...].astype(np.float32) # [1, 3, 640, 640]
-
- # 5. 准备所有输入
- input_dict = {}
-
- # 主图像输入
- if 'image' in self.inputs:
- input_dict['image'] = img_tensor
- elif 'images' in self.inputs:
- input_dict['images'] = img_tensor
- else:
- # 使用第一个输入
- first_input_name = list(self.inputs.keys())[0]
- input_dict[first_input_name] = img_tensor
-
- # ✅ 修正 4: 计算缩放因子 (实际图像尺寸 / 目标尺寸)
- scale_h = orig_h / target_size
- scale_w = orig_w / target_size
-
- # im_shape 输入 (原始图像尺寸)
- if 'im_shape' in self.inputs:
- im_shape = np.array([[float(orig_h), float(orig_w)]], dtype=np.float32)
- input_dict['im_shape'] = im_shape
-
- # scale_factor 输入
- if 'scale_factor' in self.inputs:
- # ⚠️ 注意:这里是原始尺寸/目标尺寸的比例
- scale_factor = np.array([[scale_h, scale_w]], dtype=np.float32)
- input_dict['scale_factor'] = scale_factor
-
- # ✅ 返回的 scale 用于后处理坐标还原
- # 因为不保持长宽比,所以需要分别记录 x 和 y 的缩放
- return input_dict, (scale_h, scale_w), (orig_h, orig_w)
-
- def postprocess(
- self,
- outputs: List[np.ndarray],
- scale: Tuple[float, float], # (scale_h, scale_w)
- orig_shape: Tuple[int, int],
- conf_threshold: float = 0.5
- ) -> List[Dict]:
- """
- 后处理模型输出
-
- Args:
- outputs: ONNX 模型输出
- scale: 缩放因子 (scale_h, scale_w) = (原图高/640, 原图宽/640)
- orig_shape: 原始图像尺寸 (h, w)
- conf_threshold: 置信度阈值
-
- Returns:
- 检测结果列表
- """
- # 打印调试信息
- print(f" 📊 Processing {len(outputs)} outputs")
- for i, output in enumerate(outputs):
- print(f" Output[{i}] shape: {output.shape}, dtype: {output.dtype}, range: [{output.min():.2f}, {output.max():.2f}]")
-
- scale_h, scale_w = scale
- orig_h, orig_w = orig_shape
-
- print(f" 🔄 Scale factors: scale_h={scale_h:.3f}, scale_w={scale_w:.3f}")
- print(f" 📐 Original shape: {orig_h} x {orig_w}")
-
- # 根据输出形状判断格式
- if len(outputs) >= 2:
- output0_shape = outputs[0].shape
- output1_shape = outputs[1].shape
-
- # RT-DETR ONNX 格式: (num_boxes, 6)
- # 格式: [label_id, score, x1, y1, x2, y2]
- if len(output0_shape) == 2 and output0_shape[1] == 6:
- print(f" ✅ Detected RT-DETR ONNX format: (num_boxes, 6) [label, score, x1, y1, x2, y2]")
- pred = outputs[0] # [num_boxes, 6]
-
- labels = pred[:, 0].astype(int)
- scores = pred[:, 1]
- bboxes = pred[:, 2:6].copy() # [x1, y1, x2, y2] - 在 640×640 尺度上
-
- # 情况2: output0 是 (batch, num_boxes, 6) - 带batch的合并格式
- elif len(output0_shape) == 3 and output0_shape[2] == 6:
- print(f" ✅ Detected batched RT-DETR format: (batch, num_boxes, 6)")
- pred = outputs[0][0] # Remove batch dimension
-
- labels = pred[:, 0].astype(int)
- scores = pred[:, 1]
- bboxes = pred[:, 2:6].copy()
-
- # 情况3: output0 是 bboxes, output1 是 scores (分离格式)
- elif len(output0_shape) == 2 and output0_shape[1] == 4:
- print(f" ✅ Detected separate format: bboxes + scores")
- bboxes = outputs[0].copy() # [num_boxes, 4]
-
- if len(output1_shape) == 1:
- scores = outputs[1]
- labels = np.zeros(len(scores), dtype=int)
- elif len(output1_shape) == 2:
- scores_all = outputs[1]
- scores = scores_all.max(axis=1)
- labels = scores_all.argmax(axis=1)
- else:
- raise ValueError(f"Unexpected output1 shape: {output1_shape}")
-
- # 情况4: RT-DETR 格式 (batch, num_boxes, 4) + (batch, num_boxes, num_classes)
- elif len(output0_shape) == 3 and output0_shape[2] == 4:
- print(f" ✅ Detected RT-DETR separate format")
- bboxes = outputs[0][0].copy()
- scores_all = outputs[1][0]
- scores = scores_all.max(axis=1)
- labels = scores_all.argmax(axis=1)
-
- else:
- raise ValueError(f"Unexpected output format: {output0_shape}, {output1_shape}")
-
- elif len(outputs) == 1:
- # 单一输出
- output_shape = outputs[0].shape
-
- if len(output_shape) == 2 and output_shape[1] == 6:
- print(f" ✅ Detected single RT-DETR output: (num_boxes, 6)")
- pred = outputs[0]
- labels = pred[:, 0].astype(int)
- scores = pred[:, 1]
- bboxes = pred[:, 2:6].copy()
-
- elif len(output_shape) == 3 and output_shape[2] == 6:
- print(f" ✅ Detected single batched output: (batch, num_boxes, 6)")
- pred = outputs[0][0]
- labels = pred[:, 0].astype(int)
- scores = pred[:, 1]
- bboxes = pred[:, 2:6].copy()
-
- else:
- raise ValueError(f"Unexpected single output shape: {output_shape}")
-
- else:
- raise ValueError(f"Unexpected number of outputs: {len(outputs)}")
-
- print(f" 📦 Parsed: {len(bboxes)} boxes, score range: [{scores.min():.6f}, {scores.max():.6f}]")
- print(f" 📏 Bbox range before scaling: x=[{bboxes[:, 0].min():.1f}, {bboxes[:, 2].max():.1f}], y=[{bboxes[:, 1].min():.1f}, {bboxes[:, 3].max():.1f}]")
-
- # ✅ 关键修复:将坐标从 640×640 还原到原图尺度
- # bboxes 当前在 [0, 640] 范围内,需要乘以缩放因子
- bboxes[:, [0, 2]] *= scale_w # x1, x2 乘以 width scale
- bboxes[:, [1, 3]] *= scale_h # y1, y2 乘以 height scale
-
- print(f" 📏 Bbox range after scaling: x=[{bboxes[:, 0].min():.1f}, {bboxes[:, 2].max():.1f}], y=[{bboxes[:, 1].min():.1f}, {bboxes[:, 3].max():.1f}]")
-
- # ⚠️ 自适应阈值
- max_score = scores.max() if len(scores) > 0 else 0
- if max_score < conf_threshold:
- adjusted_threshold = max(max_score * 0.5, 0.05)
- print(f" ⚙️ Auto-adjusting threshold: {conf_threshold:.3f} → {adjusted_threshold:.3f} (max_score={max_score:.3f})")
- conf_threshold = adjusted_threshold
-
- # 过滤低分框
- mask = scores > conf_threshold
- bboxes = bboxes[mask]
- scores = scores[mask]
- labels = labels[mask]
-
- print(f" ✂️ After filtering (score > {conf_threshold:.3f}): {len(bboxes)} boxes")
-
- # 过滤完全在图像外的框
- valid_mask = (
- (bboxes[:, 2] > 0) & # x2 > 0
- (bboxes[:, 3] > 0) & # y2 > 0
- (bboxes[:, 0] < orig_w) & # x1 < width
- (bboxes[:, 1] < orig_h) # y1 < height
- )
- bboxes = bboxes[valid_mask]
- scores = scores[valid_mask]
- labels = labels[valid_mask]
-
- print(f" 🗺️ After spatial filtering: {len(bboxes)} boxes")
-
- # 裁剪坐标到图像范围
- bboxes[:, [0, 2]] = np.clip(bboxes[:, [0, 2]], 0, orig_w)
- bboxes[:, [1, 3]] = np.clip(bboxes[:, [1, 3]], 0, orig_h)
-
- # 构造结果
- results = []
- for box, score, label in zip(bboxes, scores, labels):
- x1, y1, x2, y2 = box
-
- # 过滤无效框
- width = x2 - x1
- height = y2 - y1
-
- # 过滤太小的框
- if width < 10 or height < 10:
- continue
-
- # 过滤面积异常大的框
- area = width * height
- img_area = orig_w * orig_h
- if area > img_area * 0.95:
- continue
-
- results.append({
- 'category_id': int(label),
- 'category_name': self.CATEGORY_NAMES.get(int(label), f'unknown_{label}'),
- 'bbox': [int(x1), int(y1), int(x2), int(y2)],
- 'poly': [int(x1), int(y1), int(x2), int(y1), int(x2), int(y2), int(x1), int(y2)],
- 'score': float(score),
- 'width': int(width),
- 'height': int(height)
- })
-
- print(f" ✅ Final valid boxes: {len(results)}")
-
- return results
- def predict(
- self,
- img: np.ndarray,
- conf_threshold: float = 0.05 # 🔧 降低默认阈值
- ) -> List[Dict]:
- """
- 执行预测
-
- Args:
- img: BGR 格式的输入图像
- conf_threshold: 置信度阈值(默认 0.05,会自动调整)
-
- Returns:
- 检测结果列表
- """
- # 预处理
- input_dict, scale, orig_shape = self.preprocess(img)
-
- # 打印输入形状(调试用)
- for name, tensor in input_dict.items():
- print(f" Input '{name}' shape: {tensor.shape}")
-
- # ONNX 推理
- output_names = [out.name for out in self.session.get_outputs()]
- outputs = self.session.run(output_names, input_dict)
-
- # 打印输出形状(调试用)
- for i, output in enumerate(outputs):
- print(f" Output {i} shape: {output.shape}")
-
- # 后处理
- results = self.postprocess(outputs, scale, orig_shape, conf_threshold)
-
- return results
-
- def visualize(
- self,
- img: np.ndarray,
- results: List[Dict],
- output_path: str = None
- ) -> np.ndarray:
- """
- 可视化检测结果
-
- Args:
- img: 输入图像
- results: 检测结果
- output_path: 输出路径(可选)
-
- Returns:
- 标注后的图像
- """
- img_vis = img.copy()
-
- # 颜色映射
- colors = [
- (255, 0, 0), # text: 红色
- (0, 255, 0), # title: 绿色
- (0, 0, 255), # figure: 蓝色
- (255, 255, 0), # figure_caption: 青色
- (255, 0, 255), # table: 洋红
- (0, 255, 255), # table_caption: 黄色
- (128, 0, 128), # header: 紫色
- (128, 128, 0), # footer: 橄榄绿
- (0, 128, 128), # reference: 青绿
- (255, 128, 0), # equation: 橙色
- ]
-
- for res in results:
- x1, y1, x2, y2 = res['bbox']
- category_id = res['category_id']
- category_name = res['category_name']
- score = res['score']
-
- # 选择颜色
- color = colors[category_id % len(colors)]
-
- # 绘制边框
- cv2.rectangle(img_vis, (x1, y1), (x2, y2), color, 2)
-
- # 绘制标签
- label = f"{category_name}: {score:.2f}"
- label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
- label_w, label_h = label_size
-
- # 标签背景
- cv2.rectangle(img_vis, (x1, y1 - label_h - 10), (x1 + label_w, y1), color, -1)
- # 标签文字
- cv2.putText(img_vis, label, (x1, y1 - 5),
- cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
-
- if output_path:
- Path(output_path).parent.mkdir(parents=True, exist_ok=True)
- cv2.imwrite(output_path, img_vis)
- print(f"✅ Visualization saved to: {output_path}")
-
- return img_vis
- # 使用示例
- if __name__ == "__main__":
- # 初始化检测器
- onnx_model_path = "./Layout/RT-DETR-H_layout_17cls.onnx"
- detector = LayoutDetectorONNX(onnx_model_path, use_gpu=False)
-
- # 读取图像
- img_path = "/Users/zhch158/workspace/data/流水分析/B用户_扫描流水/PaddleOCR_VL_Results/B用户_扫描流水/B用户_扫描流水_page_001.png"
- img = cv2.imread(img_path)
-
- if img is None:
- print(f"❌ Failed to load image: {img_path}")
- exit(1)
-
- # 执行检测
- print(f"🔄 Processing image: {img_path}")
- results = detector.predict(img, conf_threshold=0.3)
-
- print(f"\n✅ 检测到 {len(results)} 个区域:")
- for i, res in enumerate(results, 1):
- print(f" [{i}] {res['category_name']}: "
- f"score={res['score']:.3f}, "
- f"bbox={res['bbox']}")
-
- # 可视化
- output_path = "/Users/zhch158/workspace/repository.git/PaddleX/zhch/sample_data/single_model_output/RT-DETR-H_layout_17cls/B用户_扫描流水_page_001_layout_onnx.png"
- img_vis = detector.visualize(img, results, output_path)
-
- print(f"\n📊 Detection Summary:")
- print(f" Total detections: {len(results)}")
-
- # 统计各类别数量
- category_counts = {}
- for res in results:
- cat_name = res['category_name']
- category_counts[cat_name] = category_counts.get(cat_name, 0) + 1
-
- for cat_name, count in sorted(category_counts.items()):
- print(f" - {cat_name}: {count}")
|