import os import io import uuid import json import zipfile import tempfile import threading import queue import shutil from pathlib import Path from PIL import Image import requests import gradio as gr import re import math import datetime # Local project imports (assumed available) from dots_ocr.utils import dict_promptmode_to_prompt from dots_ocr.utils.consts import MIN_PIXELS, MAX_PIXELS from dots_ocr.utils.demo_utils.display import read_image from dots_ocr.parser import DotsOCRParser # ---------------- Config & globals ---------------- DEFAULT_CONFIG = { "ip": "127.0.0.1", "port_vllm": 8000, "min_pixels": MIN_PIXELS, "max_pixels": MAX_PIXELS, } # Absolute constraints discovered from runtime: ABS_MIN_PIXELS = 3136 ABS_MAX_PIXELS = 11289600 current_config = DEFAULT_CONFIG.copy() # default parser instance (can be overridden per-task) dots_parser = DotsOCRParser( ip=DEFAULT_CONFIG["ip"], port=DEFAULT_CONFIG["port_vllm"], dpi=200, min_pixels=DEFAULT_CONFIG["min_pixels"], max_pixels=DEFAULT_CONFIG["max_pixels"], ) RESULTS_CACHE = {} # rid -> result dict or placeholder TASK_QUEUE = queue.Queue() # Worker pool for background processing (adjustable via UI) WORKER_THREADS = [] MAX_CONCURRENCY = 6 THREAD_LOCK = threading.Lock() RETRY_COUNTS = {} # rid -> attempts MAX_AUTO_RETRIES = 5 RETRY_BACKOFF_BASE = 1.7 DEFAULT_SCRIPT_TEMPLATE = """# 高级脚本使用说明 # 提供对象: api # 日志: 使用 print(...) 或 debug(...) 输出到下方“脚本日志”实时区域。 # api.get_ids() -> [rid,...] 按当前 UI 顺序返回 # api.get_status(rid) -> {'status','ui': {'tab','nohf','source'}, 'filtered': bool, 'input_width': int, 'input_height': int} # api.get_texts(rid) -> { # 'md': 原始 Markdown, 'md_nohf': 原始 NOHF Markdown, 'json': 原始 JSON, # 'md_edit': 编辑版 Markdown 或 None, 'md_nohf_edit': 编辑版 NOHF Markdown 或 None, 'json_edit': 编辑版 JSON 或 None # } # api.choose_texts(rid, prefer_ui=True, prefer_edit=True, prefer_nohf=None) -> {'md','json'} # - prefer_ui: True 时根据当前 UI 的 NOHF/来源选择内容 # - prefer_edit: True 时优先用编辑内容(若存在) # - prefer_nohf: 显式指定是否使用 NOHF(覆盖 UI),None 表示跟随 UI # api.list_paths(rid) -> { # 'temp_dir': str, 'session_id': str, # 'result': {'md':path,'md_nohf':path,'json':path,'layout':path or None,'image':path or None}, # 'edited': {'md':path or None,'md_nohf':path or None,'json':path or None} # } # api.path_exists(path) -> bool 判断路径是否存在 # api.build_export(name='custom') -> ExportBuilder # ExportBuilder: # .add_text('dir/file.md', '...') 写入文本 # .add_bytes('bin/data.bin', b'...') 写入二进制 # .add_file('/abs/path/file.md', 'dir/file.md') 拷贝已有文件 # .mkdir('subdir/') 创建目录 # .finalize() -> zip_path 打包为 zip 并返回路径 # # 约定: 定义 main(api) 并返回以下之一: # - ExportBuilder 实例(将自动 finalize) # - 目录路径或文件路径(目录将被打包为 zip) # - None(若存在变量 export=ExportBuilder,将自动 finalize) # # 示例:按 UI 所见优先使用“编辑源码”与 NOHF,导出每个结果的 md/json,同时附带原始与编辑文件 def main(api): ids = api.get_ids() eb = api.build_export('custom_export') for i, rid in enumerate(ids, start=1): st = api.get_status(rid) if st['status'] != 'done': continue choice = api.choose_texts(rid, prefer_ui=True, prefer_edit=True) eb.add_text(f'result_{i}_{rid}/content.md', choice['md'] or '') eb.add_text(f'result_{i}_{rid}/data.json', choice['json'] or '{}') paths = api.list_paths(rid) # 附带原始文件 for p in (paths.get('result') or {}).values(): if p and api.path_exists(p): name = Path(p).name eb.add_file(p, f'result_{i}_{rid}/raw/{name}') # 附带编辑文件 for p in (paths.get('edited') or {}).values(): if p and api.path_exists(p): name = Path(p).name eb.add_file(p, f'result_{i}_{rid}/edited/{name}') return eb """ # ---------------- Helpers ---------------- def read_image_v2(img): """Read image from URL or local path / PIL.Image. Supports file paths and URLs.""" if isinstance(img, Image.Image): return img if isinstance(img, str) and img.startswith(("http://", "https://")): with requests.get(img, stream=True) as r: r.raise_for_status() return Image.open(io.BytesIO(r.content)).convert("RGB") if isinstance(img, str) and os.path.exists(img): return Image.open(img).convert("RGB") try: img_res = read_image(img, use_native=True) if isinstance(img_res, tuple) and isinstance(img_res[0], Image.Image): return img_res[0] except Exception: pass raise ValueError(f"Unsupported image input: {type(img)} / {repr(img)[:200]}") def create_temp_session_dir(): session_id = uuid.uuid4().hex[:8] temp_dir = os.path.join(tempfile.gettempdir(), f"dots_ocr_demo_{session_id}") os.makedirs(temp_dir, exist_ok=True) return temp_dir, session_id def classify_parse_failure(exc, min_p, max_p): """Return a user-friendly error message for known failure causes.""" msg = str(exc) reasons = [] # Absolute & semantic constraints if min_p < ABS_MIN_PIXELS: reasons.append( f"Min Pixels 过小:{min_p},必须 >= {ABS_MIN_PIXELS}。建议提高 Min Pixels。" ) if max_p > ABS_MAX_PIXELS: reasons.append( f"Max Pixels 过大:{max_p},必须 <= {ABS_MAX_PIXELS}。建议降低 Max Pixels。" ) if min_p >= max_p: reasons.append( f"像素参数不合法:Min Pixels({min_p}) >= Max Pixels({max_p}),必须满足 Min Pixels < Max Pixels。" ) lower = msg.lower() if "no results returned from parser" in lower or "no results returned" in lower: reasons.append( "解析未返回结果。可能原因:图像过小、Min Pixels 设置过小或过滤过强。" f"建议:Min Pixels >= {ABS_MIN_PIXELS} 且 Max Pixels <= {ABS_MAX_PIXELS}。" ) if "failed to read input" in lower or "cannot identify image file" in lower: reasons.append("无法读取输入文件,请确认文件是否为有效图片或PDF。") if ("connection" in lower and "refused" in lower) or ("connectionerror" in lower): reasons.append("无法连接后端推理服务,请检查 Server IP/Port 与服务状态。") if not reasons: reasons.append(f"未知错误:{msg}") detail = "\n".join(f"- {r}" for r in reasons) cfg = f"(当前参数:min_pixels={min_p}, max_pixels={max_p})" return f"解析失败:\n{detail}\n{cfg}" def _is_transient_backend_error(exc: Exception): lower = str(exc).lower() # Common signals: connection refused/reset, timeout, gateway, service unavailable keywords = [ "connection refused", "connectionerror", "timeout", "timed out", "gateway", "service unavailable", "failed to establish a new connection", "max retries exceeded", "read timeout", "connect timeout", ] return any(k in lower for k in keywords) def parse_image_with_high_level_api(parser, image, prompt_mode, fitz_preprocess=False): """ Calls parser.parse_image with a PIL image (or accepts image path if parser expects path). Returns dictionary with artifacts. Keeps a temp PNG of the input for traceability. """ temp_dir, session_id = create_temp_session_dir() if not isinstance(image, Image.Image): image = read_image_v2(image) temp_image_path = os.path.join(temp_dir, f"input_{session_id}.png") image.save(temp_image_path, "PNG") filename = f"demo_{session_id}" results = parser.parse_image( input_path=image, filename=filename, prompt_mode=prompt_mode, save_dir=temp_dir, fitz_preprocess=fitz_preprocess, ) if not results: raise RuntimeError("No results returned from parser") result = results[0] layout_image = None if result.get("layout_image_path") and os.path.exists(result["layout_image_path"]): try: layout_image = Image.open(result["layout_image_path"]).convert("RGB") except Exception: layout_image = None cells_data = None if result.get("layout_info_path") and os.path.exists(result["layout_info_path"]): with open(result["layout_info_path"], "r", encoding="utf-8") as f: cells_data = json.load(f) md_content = None if result.get("md_content_path") and os.path.exists(result["md_content_path"]): with open(result["md_content_path"], "r", encoding="utf-8") as f: md_content = f.read() md_content_nohf = None if result.get("md_content_nohf_path") and os.path.exists( result["md_content_nohf_path"] ): with open(result["md_content_nohf_path"], "r", encoding="utf-8") as f: md_content_nohf = f.read() json_code = "" if cells_data is not None: try: json_code = json.dumps(cells_data, ensure_ascii=False, indent=2) except Exception: json_code = str(cells_data) return { "original_image": image, "layout_image": layout_image, "cells_data": cells_data, "md_content": md_content, "md_content_nohf": md_content_nohf, "json_code": json_code, "filtered": result.get("filtered", False), "temp_dir": temp_dir, "session_id": session_id, "result_paths": result, "input_width": result.get("input_width", 0), "input_height": result.get("input_height", 0), "input_temp_path": temp_image_path, } def _validate_pixels(min_p, max_p): """Coerce pixel parameters. Do NOT auto-swap; semantic errors are handled by pre-validation.""" try: min_p = int(min_p) except Exception: min_p = DEFAULT_CONFIG["min_pixels"] try: max_p = int(max_p) except Exception: max_p = DEFAULT_CONFIG["max_pixels"] if min_p <= 0: min_p = DEFAULT_CONFIG["min_pixels"] if max_p <= 0: max_p = DEFAULT_CONFIG["max_pixels"] return min_p, max_p def _set_parser_config(server_ip, server_port, min_pixels, max_pixels): min_pixels, max_pixels = _validate_pixels(min_pixels, max_pixels) current_config.update( { "ip": server_ip, "port_vllm": int(server_port), "min_pixels": min_pixels, "max_pixels": max_pixels, } ) dots_parser.ip = server_ip dots_parser.port = int(server_port) dots_parser.min_pixels = min_pixels dots_parser.max_pixels = max_pixels def purge_queue(rid): """Best-effort remove tasks matching rid from queue.""" pending = [] try: while True: task = TASK_QUEUE.get_nowait() if task and isinstance(task, tuple): if task[0] != rid: pending.append(task) TASK_QUEUE.task_done() except queue.Empty: pass for t in pending: TASK_QUEUE.put(t) # ---------------- Export helpers ---------------- def export_one_rid(rid): st = RESULTS_CACHE.get(rid) if not st: return None temp_dir = st.get("temp_dir") if not temp_dir or not os.path.isdir(temp_dir): return None out_dir, _sess = create_temp_session_dir() zip_path = os.path.join(out_dir, f"export_{rid}.zip") with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: for rt, _, files in os.walk(temp_dir): for f in files: src = os.path.join(rt, f) rel = os.path.relpath(src, temp_dir) zf.write(src, os.path.join(f"result_{rid}", rel)) return zip_path def ensure_export_ready(rid): """Create and cache export zip path if not present.""" st = RESULTS_CACHE.get(rid) or {} if not st or st.get("status") != "done": return None path = st.get("export_path") if path and os.path.exists(path): return path path = export_one_rid(rid) if path: st["export_path"] = path RESULTS_CACHE[rid] = st return path # ---------------- Script API & execution ---------------- class ExportBuilder: def __init__(self, name=None): root, sid = create_temp_session_dir() sub = f"script_export_{sid}" if name: sub = f"{name}_{sid}" self.root_dir = os.path.join(root, sub) os.makedirs(self.root_dir, exist_ok=True) self._final_zip = None def _abspath(self, rel_path: str): rel_path = rel_path.lstrip("/\\") return os.path.join(self.root_dir, rel_path) def mkdir(self, rel_dir: str): p = self._abspath(rel_dir) os.makedirs(p, exist_ok=True) return p def add_text(self, rel_path: str, content: str, encoding: str = "utf-8"): p = self._abspath(rel_path) os.makedirs(os.path.dirname(p), exist_ok=True) with open(p, "w", encoding=encoding) as f: f.write("" if content is None else str(content)) return p def add_bytes(self, rel_path: str, data: bytes): p = self._abspath(rel_path) os.makedirs(os.path.dirname(p), exist_ok=True) with open(p, "wb") as f: f.write(data or b"") return p def add_file(self, src_path: str, dest_rel_path: str = None): if not src_path or not os.path.exists(src_path): return None dest_rel_path = dest_rel_path or os.path.basename(src_path) p = self._abspath(dest_rel_path) os.makedirs(os.path.dirname(p), exist_ok=True) shutil.copy2(src_path, p) return p def finalize(self, zip_name: str = None): if self._final_zip and os.path.exists(self._final_zip): return self._final_zip out_dir, sid = create_temp_session_dir() zip_name = zip_name or f"script_export_{sid}.zip" zip_path = os.path.join(out_dir, zip_name) with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: for rt, _, files in os.walk(self.root_dir): for f in files: src = os.path.join(rt, f) rel = os.path.relpath(src, self.root_dir) zf.write(src, rel) self._final_zip = zip_path return zip_path class ScriptAPI: def __init__(self, ids_snapshot): self._ids = list(ids_snapshot or []) def get_ids(self): return list(self._ids) def get_status(self, rid: str): st = dict(RESULTS_CACHE.get(rid) or {}) ui = dict(st.get("ui") or {}) return { "status": st.get("status", "pending"), "ui": { "tab": ui.get("tab", "md"), "nohf": bool(ui.get("nohf", False)), "source": ui.get("source", "源码"), }, "filtered": bool(st.get("filtered", False)), "input_width": int(st.get("input_width", 0) or 0), "input_height": int(st.get("input_height", 0) or 0), } def get_texts(self, rid: str): st = dict(RESULTS_CACHE.get(rid) or {}) edits = dict(st.get("edits") or {}) return { "md": st.get("md_content") or "", "md_nohf": st.get("md_content_nohf") or "", "json": st.get("json_code") or "", "md_edit": edits.get("md"), "md_nohf_edit": edits.get("nohf"), "json_edit": edits.get("json"), } def choose_texts( self, rid: str, prefer_ui: bool = True, prefer_edit: bool = True, prefer_nohf: bool | None = None, ): st = dict(RESULTS_CACHE.get(rid) or {}) ui = dict(st.get("ui") or {}) # UI 指示 ui_nohf = bool(ui.get("nohf", False)) ui_source_is_edit = str(ui.get("source", "源码")) == "编辑源码" # 选择 nohf use_nohf = ui_nohf if prefer_nohf is None else bool(prefer_nohf) # 选择是否优先编辑 prefer_edit_final = bool(prefer_edit or (prefer_ui and ui_source_is_edit)) t = self.get_texts(rid) # Markdown md_orig = t["md_nohf"] if use_nohf else t["md"] md_edit = t["md_nohf_edit"] if use_nohf else t["md_edit"] md = (md_edit if (prefer_edit_final and md_edit is not None) else md_orig) or "" # JSON json_text = ( t["json_edit"] if (prefer_edit_final and t.get("json_edit") is not None) else t["json"] ) or "" return {"md": md, "json": json_text} def list_paths(self, rid: str): st = dict(RESULTS_CACHE.get(rid) or {}) rp = dict(st.get("result_paths") or {}) md_p = rp.get("md_content_path") nohf_p = rp.get("md_content_nohf_path") json_p = rp.get("layout_info_path") or rp.get("json_path") image_p = rp.get("layout_image_path") or None # 编辑路径(若存在) edited_md = None edited_nohf = None edited_json = None try: edited_md = _edited_filepath(st, "md") if not os.path.exists(edited_md): edited_md = None except Exception: edited_md = None try: edited_nohf = _edited_filepath(st, "nohf") if not os.path.exists(edited_nohf): edited_nohf = None except Exception: edited_nohf = None try: edited_json = _edited_filepath(st, "json") if not os.path.exists(edited_json): edited_json = None except Exception: edited_json = None return { "temp_dir": st.get("temp_dir"), "session_id": st.get("session_id"), "result": { "md": md_p if (md_p and os.path.exists(md_p)) else None, "md_nohf": nohf_p if (nohf_p and os.path.exists(nohf_p)) else None, "json": json_p if (json_p and os.path.exists(json_p)) else None, "layout": image_p if (image_p and os.path.exists(image_p)) else None, "input_image": ( st.get("input_temp_path") if ( st.get("input_temp_path") and os.path.exists(st.get("input_temp_path")) ) else None ), }, "edited": { "md": edited_md, "md_nohf": edited_nohf, "json": edited_json, }, } def path_exists(self, p: str) -> bool: try: return bool(p) and os.path.exists(p) except Exception: return False def build_export(self, name: str | None = None): return ExportBuilder(name=name) def _safe_builtins(): base = ( __builtins__ if isinstance(__builtins__, dict) else getattr(__builtins__, "__dict__", {}) ) allow = [ "abs", "min", "max", "sum", "len", "range", "enumerate", "map", "filter", "zip", "list", "dict", "set", "tuple", "str", "int", "float", "bool", "print", "any", "all", "sorted", ] return {k: base[k] for k in allow if k in base} def run_user_script(script_code: str, ids_snapshot): """ 非流式执行用户脚本,捕获标准输出并返回(zip_path, logs)。 """ api = ScriptAPI(ids_snapshot) ns = { "__builtins__": _safe_builtins(), "api": api, "json": json, "re": re, "math": math, "datetime": datetime, "Path": Path, "io": io, "ExportBuilder": ExportBuilder, } import contextlib from io import StringIO buf = StringIO() zip_path = None try: code = script_code or "" with contextlib.redirect_stdout(buf): exec(code, ns, ns) result = None main_fn = ns.get("main") if callable(main_fn): result = main_fn(api) else: result = ns.get("RESULT") or ns.get("OUTPUT_PATH") if isinstance(result, ExportBuilder): zip_path = result.finalize() elif isinstance(result, str) and result: if os.path.isdir(result): eb = ExportBuilder("script_dir_export") for rt, _, files in os.walk(result): for f in files: src = os.path.join(rt, f) rel = os.path.relpath(src, result) eb.add_file(src, rel) zip_path = eb.finalize() elif os.path.exists(result): zip_path = result if not zip_path: exp = ns.get("export") if isinstance(exp, ExportBuilder): zip_path = exp.finalize() except Exception as e: err = f"[Script Error] {type(e).__name__}: {e}" return None, (buf.getvalue() + "\n" + err) return ( zip_path if (zip_path and os.path.exists(zip_path)) else None ), buf.getvalue() def run_user_script_stream(script_code: str, ids_snapshot): """生成器:实时输出日志,并在结束时返回下载地址与完成状态。""" # 日志队列 log_q = queue.Queue() def _emit(kind, payload=None): log_q.put((kind, payload)) def debug(*args, **kwargs): text = " ".join(str(a) for a in args) if text: _emit("log", text) # 准备脚本命名空间(与非流式版本一致,但覆盖 print/debug) api = ScriptAPI(ids_snapshot) ns = { "__builtins__": _safe_builtins(), "api": api, "json": json, "re": re, "math": math, "datetime": datetime, "Path": Path, "io": io, "ExportBuilder": ExportBuilder, # 专用日志函数 "debug": debug, "print": debug, } result_holder = {"zip_path": None, "error": None} def _worker(): try: code = script_code or "" exec(code, ns, ns) res = None main_fn = ns.get("main") if callable(main_fn): res = main_fn(api) else: res = ns.get("RESULT") or ns.get("OUTPUT_PATH") zip_path = None if isinstance(res, ExportBuilder): zip_path = res.finalize() elif isinstance(res, str) and res: if os.path.isdir(res): eb = ExportBuilder("script_dir_export") for rt, _, files in os.walk(res): for f in files: src = os.path.join(rt, f) rel = os.path.relpath(src, res) eb.add_file(src, rel) zip_path = eb.finalize() elif os.path.exists(res): zip_path = res if not zip_path: exp = ns.get("export") if isinstance(exp, ExportBuilder): zip_path = exp.finalize() result_holder["zip_path"] = ( zip_path if (zip_path and os.path.exists(zip_path)) else None ) except Exception as e: result_holder["error"] = f"[Script Error] {type(e).__name__}: {e}" finally: _emit("done", None) # 启动脚本线程 t = threading.Thread(target=_worker, daemon=True) t.start() # 初始状态显示 spinner_html = ( "
, outer container hidden overflow */
.script-log { max-height: 260px; overflow: hidden; border:1px solid #1f2937; border-radius:6px; padding:0; }
.script-log pre {
max-height: 260px;
overflow: auto;
margin: 0;
padding: 6px;
background: transparent;
scrollbar-width: thin; /* Firefox */
scrollbar-color: rgba(255,255,255,0.2) transparent;
}
.script-log pre::-webkit-scrollbar { width: 6px; height: 6px; }
.script-log pre::-webkit-scrollbar-track { background: transparent; }
.script-log pre::-webkit-scrollbar-thumb { background: rgba(255,255,255,0.12); border-radius: 4px; }
.script-log pre:hover::-webkit-scrollbar-thumb { background: rgba(255,255,255,0.25); }
"""
with gr.Blocks(css=css, title="dots.ocr") as demo:
# Left column controls
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(
label="Upload Multiple Images",
type="filepath",
file_count="multiple",
file_types=[".jpg", ".jpeg", ".png", ".pdf"],
)
# Filter out the unwanted 'prompt_grounding_ocr' mode
allowed_modes = [
m
for m in dict_promptmode_to_prompt.keys()
if m != "prompt_grounding_ocr"
]
if not allowed_modes:
allowed_modes = list(dict_promptmode_to_prompt.keys())
prompt_mode = gr.Dropdown(
label="Prompt Mode",
choices=allowed_modes,
value=allowed_modes[0],
)
prompt_display = gr.Textbox(
label="Prompt Preview",
value=dict_promptmode_to_prompt[allowed_modes[0]],
interactive=False,
lines=4,
)
with gr.Row():
parse_btn = gr.Button("🔍 Parse", variant="primary")
clear_btn = gr.Button("🗑️ Clear")
with gr.Accordion("Advanced Config", open=False):
fitz_preprocess = gr.Checkbox(label="fitz_preprocess", value=True)
server_ip = gr.Textbox(
label="Server IP", value=DEFAULT_CONFIG["ip"]
)
server_port = gr.Number(
label="Port", value=DEFAULT_CONFIG["port_vllm"], precision=0
)
min_pixels = gr.Number(
label="Min Pixels", value=DEFAULT_CONFIG["min_pixels"]
)
max_pixels = gr.Number(
label="Max Pixels", value=DEFAULT_CONFIG["max_pixels"]
)
concurrency = gr.Number(
label="Max Concurrency",
value=MAX_CONCURRENCY, # 与实际生效的后台并发保持一致(支持刷新后保持)
precision=0,
interactive=True,
)
confirm_delete = gr.Checkbox(
label="删除前确认(推荐)", value=True, interactive=True
)
# Right column: results & actions
with gr.Column(scale=5):
info_display = gr.Markdown("Waiting...", elem_id="info_box")
ids_state = gr.State(value=[])
store_tick = gr.State(value=0)
render_bump = gr.State(value=0) # 仅用于在状态变化时触发结果重渲染
confirm_delete_state = gr.State(value=True)
confirm_delete.change(
lambda v: v, inputs=[confirm_delete], outputs=[confirm_delete_state]
)
progress_timer = gr.Timer(1.0)
# Actions 面板(多选)
with gr.Accordion("Actions", open=False):
selected_group = gr.CheckboxGroup(
label="Select Items", choices=[], value=[], interactive=True
)
with gr.Row():
select_all_btn = gr.Button("全选")
clear_sel_btn = gr.Button("清空选择")
with gr.Row():
bulk_reparse_btn = gr.Button("🔁 重解析所选")
delete_selected_btn = gr.Button("🗑️ 删除所选", variant="stop")
export_selected_btn = gr.DownloadButton("📦 导出所选")
# 高级脚本导出
with gr.Accordion("高级脚本", open=False):
gr.Markdown(
"在下方编辑并运行自定义 Python 脚本以自由处理当前解析结果并导出为任意目录/文件结构。"
"
脚本将在受限环境中执行,可通过 api 对象访问只读数据与构建导出压缩包。",
elem_classes=["muted"],
)
script_code = gr.Code(
label="Python 脚本",
language="python",
value=DEFAULT_SCRIPT_TEMPLATE,
lines=24,
interactive=True,
)
with gr.Row():
run_script_btn = gr.Button("▶ 运行脚本", variant="primary")
script_download_btn = gr.DownloadButton("📦 下载脚本输出")
script_status = gr.HTML("")
script_log = gr.Markdown(
"", elem_id="script_log", elem_classes=["script-log"]
)
# 流式执行脚本:实时打印日志与运行状态,并在完成后绑定下载按钮
run_script_btn.click(
run_user_script_stream,
inputs=[script_code, ids_state],
outputs=[script_download_btn, script_status, script_log],
show_progress="hidden",
)
# 批量删除确认面板
with gr.Row(visible=False) as bulk_delete_confirm_panel:
gr.Markdown(
"确认删除所选结果?该操作不可恢复。",
elem_classes=["muted"],
)
bulk_confirm_delete_btn = gr.Button("确认删除", variant="stop")
bulk_cancel_delete_btn = gr.Button("取消")
# Render results dynamically
@gr.render(inputs=[ids_state, render_bump])
def render_results(ids, _bump):
if not ids:
return gr.Markdown("No results yet.")
with gr.Column():
for idx, rid in enumerate(ids):
data = RESULTS_CACHE.get(rid, {}) or {}
status = data.get("status", "pending")
# 确保每张卡都有独立 UI 状态(并写回缓存,保证后续使用)
ui = _ensure_ui_state(rid)
preview_on = bool(ui.get("preview", True))
nohf_on = bool(ui.get("nohf", False))
active_tab = ui.get("tab", "md")
if active_tab not in ("md", "json"):
active_tab = "md"
source_sel = ui.get("source", "源码")
if source_sel not in ("源码", "编辑源码"):
source_sel = "源码"
with gr.Column(
elem_classes=["result-card"], elem_id=f"card-{rid}"
):
with gr.Row():
gr.Markdown(
f"### Result {idx+1} RID: {rid}"
)
if status == "error":
gr.Markdown(
f"⚠️ 解析失败:\n\n{data.get('md_content','Unknown error')}",
elem_classes=["muted"],
)
if status == "done":
orig_img = data.get("original_image")
layout_img = data.get("layout_image")
with gr.Row():
gr.Image(
value=orig_img, label="Original", height=300
)
gr.Image(
value=layout_img, label="Layout", height=300
)
elif status == "pending":
with gr.Row():
gr.HTML(
""
)
gr.HTML(
""
)
# badges
with gr.Row():
badge_md = gr.HTML(
f"MD: {'Preview' if preview_on else 'Source'}"
)
badge_nohf = gr.HTML(
f"NOHF: {'On' if nohf_on else 'Off'}"
)
# controls
with gr.Row():
rid_box = gr.Textbox(value=rid, visible=False)
preview_cb = gr.Checkbox(
label="Preview Markdown",
value=preview_on,
)
nohf_cb = gr.Checkbox(label="NOHF", value=nohf_on)
# 视图切换
selected_label = (
"Markdown" if active_tab == "md" else "JSON"
)
with gr.Row():
view_radio = gr.Radio(
label="视图",
choices=["Markdown", "JSON"],
value=selected_label,
)
# 内容来源(仅完成状态可用)
with gr.Row():
source_radio = gr.Radio(
label="内容来源",
choices=["源码", "编辑源码"],
value=source_sel,
interactive=True,
visible=(status == "done"),
)
# 内容获取助手
def _get_texts(rid_value, nohf_flag):
st = RESULTS_CACHE.get(rid_value, {}) or {}
md_orig = st.get("md_content") or ""
md_nohf_orig = st.get("md_content_nohf") or ""
md_current_orig = (
md_nohf_orig if nohf_flag else md_orig
)
edits = st.get("edits") or {}
md_edit = (
edits.get("nohf")
if nohf_flag
else edits.get("md")
)
if md_edit is None:
md_edit = md_current_orig
json_orig = st.get("json_code") or ""
json_edit = edits.get("json")
if json_edit is None:
json_edit = json_orig
return (
md_current_orig,
md_edit,
json_orig,
json_edit,
)
(
md_orig_val,
md_edit_val,
json_orig_val,
json_edit_val,
) = _get_texts(rid, nohf_on)
is_md_init = selected_label == "Markdown"
use_edit_init = source_sel == "编辑源码"
# 单一预览组件(Markdown 用)
md_preview = gr.Markdown(
value=(
md_edit_val if use_edit_init else md_orig_val
),
visible=(
status == "done" and is_md_init and preview_on
),
)
# 原始源码(只读)
md_code_orig = gr.Code(
language="markdown",
value=md_orig_val,
interactive=False,
visible=(
status == "done"
and is_md_init
and (not preview_on)
and (not use_edit_init)
),
)
# 编辑源码(可编辑、自动保存)
md_code_edit = gr.Code(
language="markdown",
value=md_edit_val,
interactive=True,
visible=(
status == "done"
and is_md_init
and (not preview_on)
and use_edit_init
),
)
# JSON(原始与编辑)
json_code_orig = gr.Code(
language="json",
value=json_orig_val,
interactive=False,
visible=(
status == "done"
and (not is_md_init)
and (not use_edit_init)
),
)
json_code_edit = gr.Code(
language="json",
value=json_edit_val,
interactive=True,
visible=(
status == "done"
and (not is_md_init)
and use_edit_init
),
)
# 仅编辑模式显示
restore_btn = gr.Button(
"还原当前内容",
visible=(status == "done" and use_edit_init),
)
# 统一可见性/内容更新
def _apply_all(
preview, use_nohf, view_label, src_label, rid_value
):
preview = bool(preview)
use_nohf = bool(use_nohf)
is_md = str(view_label) == "Markdown"
use_edit = str(src_label) == "编辑源码"
# 写回 UI 状态
st = RESULTS_CACHE.get(rid_value, {}) or {}
ui0 = dict(st.get("ui") or _default_ui_state())
ui0["preview"] = preview
ui0["nohf"] = use_nohf
ui0["tab"] = "md" if is_md else "json"
ui0["source"] = "编辑源码" if use_edit else "源码"
st["ui"] = ui0
RESULTS_CACHE[rid_value] = st
md_o, md_e, j_o, j_e = _get_texts(
rid_value, use_nohf
)
return (
gr.update(
value=f"MD: {'Preview' if preview else 'Source'}"
),
gr.update(
value=f"NOHF: {'On' if use_nohf else 'Off'}"
),
gr.update(
value=(md_e if use_edit else md_o),
visible=(is_md and preview),
),
gr.update(
value=md_o,
visible=(
is_md
and (not preview)
and (not use_edit)
),
),
gr.update(
value=md_e,
visible=(
is_md and (not preview) and use_edit
),
),
gr.update(
value=j_o,
visible=(not is_md and (not use_edit)),
),
gr.update(
value=j_e, visible=(not is_md and use_edit)
),
gr.update(visible=use_edit),
)
# 绑定控制项变化:预览、NOHF、视图、来源
preview_cb.change(
_apply_all,
inputs=[
preview_cb,
nohf_cb,
view_radio,
source_radio,
rid_box,
],
outputs=[
badge_md,
badge_nohf,
md_preview,
md_code_orig,
md_code_edit,
json_code_orig,
json_code_edit,
restore_btn,
],
show_progress="hidden",
)
nohf_cb.change(
_apply_all,
inputs=[
preview_cb,
nohf_cb,
view_radio,
source_radio,
rid_box,
],
outputs=[
badge_md,
badge_nohf,
md_preview,
md_code_orig,
md_code_edit,
json_code_orig,
json_code_edit,
restore_btn,
],
show_progress="hidden",
)
def _on_view_change(
view_label,
rid_value,
preview_flag,
nohf_flag,
src_label,
):
st = RESULTS_CACHE.get(rid_value, {}) or {}
ui0 = dict(st.get("ui") or _default_ui_state())
ui0["tab"] = (
"md"
if str(view_label) == "Markdown"
else "json"
)
st["ui"] = ui0
RESULTS_CACHE[rid_value] = st
return _apply_all(
preview_flag,
nohf_flag,
view_label,
src_label,
rid_value,
)
view_radio.change(
_on_view_change,
inputs=[
view_radio,
rid_box,
preview_cb,
nohf_cb,
source_radio,
],
outputs=[
badge_md,
badge_nohf,
md_preview,
md_code_orig,
md_code_edit,
json_code_orig,
json_code_edit,
restore_btn,
],
show_progress="hidden",
)
def _on_source_change(
src_label,
rid_value,
preview_flag,
nohf_flag,
view_label,
):
st = RESULTS_CACHE.get(rid_value, {}) or {}
ui0 = dict(st.get("ui") or _default_ui_state())
ui0["source"] = (
"编辑源码"
if str(src_label) == "编辑源码"
else "源码"
)
st["ui"] = ui0
RESULTS_CACHE[rid_value] = st
return _apply_all(
preview_flag,
nohf_flag,
view_label,
src_label,
rid_value,
)
source_radio.change(
_on_source_change,
inputs=[
source_radio,
rid_box,
preview_cb,
nohf_cb,
view_radio,
],
outputs=[
badge_md,
badge_nohf,
md_preview,
md_code_orig,
md_code_edit,
json_code_orig,
json_code_edit,
restore_btn,
],
show_progress="hidden",
)
# Action buttons per-card
with gr.Row():
reparse_btn = gr.Button(
"🔁 重新解析",
interactive=(status in ("done", "error")),
)
export_btn = gr.DownloadButton(
"📦 导出",
interactive=(status == "done"),
value=(
data.get("export_path")
if status == "done"
else None
),
)
delete_btn = gr.Button("🗑️ 删除", variant="stop")
# 自动保存(编辑器变更即写盘 + 刷新导出 + 可能的 Markdown 预览)
def _save_md_edit(
val,
rid_value,
nohf_flag,
preview_flag,
view_label,
src_label,
ids,
selected_labels,
):
st = RESULTS_CACHE.get(rid_value, {}) or {}
if st.get("status") != "done":
# 同步“导出所选”以防其它项在编辑(极少见)
path_sel = export_selected_rids(
ids, selected_labels
)
return (
gr.update(),
gr.update(),
gr.update(value=path_sel),
)
which = "nohf" if bool(nohf_flag) else "md"
edits = dict(st.get("edits") or {})
edits[which] = val or ""
st["edits"] = edits
RESULTS_CACHE[rid_value] = st
try:
_save_edited_to_disk(st, which, val or "")
except Exception:
pass
_invalidate_export_zip(rid_value)
new_zip = ensure_export_ready(rid_value)
# 刷新“导出所选”
path_sel = export_selected_rids(
ids, selected_labels
)
# 若当前正处于 Markdown/预览/编辑模式,则更新预览内容
is_md = str(view_label) == "Markdown"
use_edit = str(src_label) == "编辑源码"
if is_md and use_edit and bool(preview_flag):
return (
gr.update(value=val or ""),
gr.update(value=new_zip),
gr.update(value=path_sel),
)
return (
gr.update(),
gr.update(value=new_zip),
gr.update(value=path_sel),
)
md_code_edit.change(
_save_md_edit,
inputs=[
md_code_edit,
rid_box,
nohf_cb,
preview_cb,
view_radio,
source_radio,
ids_state,
selected_group,
],
outputs=[
md_preview,
export_btn,
export_selected_btn,
],
show_progress="hidden",
)
def _save_json_edit(
val, rid_value, ids, selected_labels
):
st = RESULTS_CACHE.get(rid_value, {}) or {}
if st.get("status") != "done":
path_sel = export_selected_rids(
ids, selected_labels
)
return gr.update(), gr.update(value=path_sel)
edits = dict(st.get("edits") or {})
edits["json"] = val or ""
st["edits"] = edits
RESULTS_CACHE[rid_value] = st
try:
_save_edited_to_disk(st, "json", val or "")
except Exception:
pass
_invalidate_export_zip(rid_value)
new_zip = ensure_export_ready(rid_value)
path_sel = export_selected_rids(
ids, selected_labels
)
return gr.update(value=new_zip), gr.update(
value=path_sel
)
json_code_edit.change(
_save_json_edit,
inputs=[
json_code_edit,
rid_box,
ids_state,
selected_group,
],
outputs=[export_btn, export_selected_btn],
show_progress="hidden",
)
# 还原当前内容
def _restore_current(
src_label,
rid_value,
nohf_flag,
preview_flag,
view_label,
ids,
selected_labels,
):
st = RESULTS_CACHE.get(rid_value, {}) or {}
which = (
"json"
if str(view_label) == "JSON"
else ("nohf" if bool(nohf_flag) else "md")
)
# 删除编辑版
edits = dict(st.get("edits") or {})
if which in edits:
edits.pop(which, None)
st["edits"] = edits
RESULTS_CACHE[rid_value] = st
try:
_delete_edited_from_disk(st, which)
except Exception:
pass
# 重新取原始内容
md_o, md_e, j_o, j_e = _get_texts(
rid_value, bool(nohf_flag)
)
# 刷新导出
_invalidate_export_zip(rid_value)
new_zip = ensure_export_ready(rid_value)
path_sel = export_selected_rids(
ids, selected_labels
)
# 更新编辑器与预览
up_md_editor = (
gr.update(value=md_o)
if which in ("md", "nohf")
else gr.update()
)
up_json_editor = (
gr.update(value=j_o)
if which == "json"
else gr.update()
)
is_md = str(view_label) == "Markdown"
use_edit = str(src_label) == "编辑源码"
up_preview = (
gr.update(value=(md_e if use_edit else md_o))
if is_md and bool(preview_flag)
else gr.update()
)
return (
up_md_editor,
up_json_editor,
up_preview,
gr.update(value=new_zip),
gr.update(value=path_sel),
)
restore_btn.click(
_restore_current,
inputs=[
source_radio,
rid_box,
nohf_cb,
preview_cb,
view_radio,
ids_state,
selected_group,
],
outputs=[
md_code_edit,
json_code_edit,
md_preview,
export_btn,
export_selected_btn,
],
show_progress="hidden",
)
# Reparse panel (collapsed)
with gr.Column(visible=False) as reparse_panel:
gr.Markdown("**重解析**")
with gr.Row():
reparse_current_btn = gr.Button(
"基于当前图片直接重解析", variant="primary"
)
# Delete confirm panel (collapsed)
with gr.Row(visible=False) as delete_confirm_panel:
gr.Markdown(
"确认删除该结果?该操作不可恢复。",
elem_classes=["muted"],
)
confirm_delete_btn = gr.Button(
"确认删除", variant="stop"
)
cancel_delete_btn = gr.Button("取消")
# 绑定其他交互
reparse_btn.click(
lambda: gr.update(visible=True),
outputs=[reparse_panel],
show_progress="hidden",
)
def _start_reparse_current(
rid_value,
p_mode,
ip_addr,
port_val,
minp,
maxp,
fitz_flag,
tick,
ids,
selected_labels,
):
try:
enqueue_single_reparse(
rid_value,
None,
p_mode,
ip_addr,
int(port_val),
int(minp),
int(maxp),
fitz_flag,
)
# 重建“导出所选”
path_sel = export_selected_rids(
ids, selected_labels
)
return (
int(tick or 0) + 1,
gr.update(visible=False),
gr.update(value=path_sel),
)
except Exception as e:
RESULTS_CACHE[rid_value] = {
"status": "error",
"md_content": f"Reparse error: {e}",
# 保留 UI 状态
"ui": _ensure_ui_state(rid_value),
}
path_sel = export_selected_rids(
ids, selected_labels
)
return (
int(tick or 0) + 1,
gr.update(visible=False),
gr.update(value=path_sel),
)
reparse_current_btn.click(
_start_reparse_current,
inputs=[
rid_box,
prompt_mode,
server_ip,
server_port,
min_pixels,
max_pixels,
fitz_preprocess,
store_tick,
ids_state,
selected_group,
],
outputs=[
store_tick,
reparse_panel,
export_selected_btn,
],
show_progress="hidden",
)
def _on_delete_click(
rid_value, ids, need_confirm, tick
):
# 如果需要确认,仅展开确认面板,不修改选择框/导出按钮
if need_confirm:
return (
gr.update(visible=True),
ids,
tick,
gr.update(), # selected_group 不变
gr.update(), # export button 不变
)
# 直接删除:更新 ids/tick,并同步 Actions 的选择项与导出按钮
new_ids, new_tick = delete_one(ids, rid_value, tick)
choices = [
f"Result {i+1}"
for i in range(len(new_ids or []))
]
return (
gr.update(visible=False),
new_ids,
new_tick,
gr.update(choices=choices, value=[]),
gr.update(value=None), # 清空导出
)
# 单卡删除输出同步 selected_group 与 export_selected_btn
delete_btn.click(
_on_delete_click,
inputs=[
rid_box,
ids_state,
confirm_delete_state,
store_tick,
],
outputs=[
delete_confirm_panel,
ids_state,
store_tick,
selected_group,
export_selected_btn,
],
show_progress="hidden",
)
def _confirm_delete(rid_value, ids, tick):
new_ids, new_tick = delete_one(ids, rid_value, tick)
choices = [
f"Result {i+1}"
for i in range(len(new_ids or []))
]
return (
new_ids,
new_tick,
gr.update(visible=False),
gr.update(choices=choices, value=[]),
gr.update(value=None),
)
# 确认删除后同步 selected_group 与 export_selected_btn
confirm_delete_btn.click(
_confirm_delete,
inputs=[rid_box, ids_state, store_tick],
outputs=[
ids_state,
store_tick,
delete_confirm_panel,
selected_group,
export_selected_btn,
],
show_progress="hidden",
)
cancel_delete_btn.click(
lambda: gr.update(visible=False),
outputs=[delete_confirm_panel],
show_progress="hidden",
)
# Top-level events
def _on_prompt_mode_change(m):
return dict_promptmode_to_prompt.get(m, "")
prompt_mode.change(
fn=_on_prompt_mode_change,
inputs=[prompt_mode],
outputs=[prompt_display],
show_progress="hidden",
)
def process_images_simple(
file_list,
p_mode,
server_ip_val,
server_port_val,
min_p_val,
max_p_val,
fitz_val,
cur_ids,
tick,
):
"""
Process images with selected prompt mode. Grounding mode is removed; all files go through normal path.
"""
minp, maxp = _validate_pixels(min_p_val, max_p_val)
_set_parser_config(server_ip_val, server_port_val, minp, maxp)
# normalize file_list (gradio file element may pass nested lists)
files = []
if not file_list:
return (
gr.update(value=None),
gr.update(value="No files uploaded."),
cur_ids,
tick,
gr.update(choices=[], value=[]),
gr.update(value=None), # 清空导出
)
# build normalized list
for f in file_list:
if isinstance(f, (list, tuple)):
files.append(f[0] if len(f) > 0 else None)
else:
files.append(f)
# Normal path: queue originals
new_ids, info = add_tasks_to_queue(
files,
p_mode,
server_ip_val,
server_port_val,
minp,
maxp,
fitz_val,
cur_ids,
)
# Update checkbox group choices
choices = [f"Result {i+1}" for i in range(len(new_ids or []))]
return (
gr.update(value=None),
gr.update(value=info),
new_ids,
int(tick or 0) + 1,
gr.update(choices=choices, value=[]),
gr.update(value=None), # 清空导出
)
parse_btn.click(
fn=process_images_simple,
inputs=[
file_input,
prompt_mode,
server_ip,
server_port,
min_pixels,
max_pixels,
fitz_preprocess,
ids_state,
store_tick,
],
outputs=[
file_input,
info_display,
ids_state,
store_tick,
selected_group,
export_selected_btn,
],
show_progress="hidden",
)
# Concurrency change handler: apply immediately
def _on_concurrency_change(n):
try:
set_max_concurrency(int(n))
return gr.update(value=f"并发已设置为 {int(n)}。")
except Exception as e:
return gr.update(value=f"设置并发失败:{e}")
concurrency.change(
_on_concurrency_change,
inputs=[concurrency],
outputs=[info_display],
show_progress="hidden",
)
# 会话加载时同步 UI 与当前真实并发(解决刷新后 UI 值与实际不一致)
def _sync_concurrency_on_session_load():
try:
# 如有需要,补齐 worker 到目标并发数(不会减少已有线程)
_start_workers(max(1, MAX_CONCURRENCY))
return (
gr.update(value=int(MAX_CONCURRENCY)),
gr.update(
value=f"已同步当前并发为 {int(MAX_CONCURRENCY)}。"
),
)
except Exception as e:
return (
gr.update(value=int(MAX_CONCURRENCY)),
gr.update(value=f"同步并发时发生异常:{e}"),
)
demo.load(
_sync_concurrency_on_session_load,
inputs=None,
outputs=[concurrency, info_display],
)
# 生成导出 ZIP(基于当前选择),用于首次点击即可下载
def _update_export_for_selection(ids, selected_labels):
path = export_selected_rids(ids, selected_labels)
return gr.update(
value=path if path and os.path.exists(path) else None
)
# Actions: 全选/清空
def _select_all(ids):
choices = [f"Result {i+1}" for i in range(len(ids or []))]
# 预生成 zip
path = export_selected_rids(ids, choices)
return (
gr.update(choices=choices, value=choices),
gr.update(
value=path if path and os.path.exists(path) else None
),
)
def _clear_selection(ids):
choices = [f"Result {i+1}" for i in range(len(ids or []))]
return (
gr.update(choices=choices, value=[]),
gr.update(value=None),
)
select_all_btn.click(
_select_all,
inputs=[ids_state],
outputs=[selected_group, export_selected_btn],
show_progress="hidden",
)
clear_sel_btn.click(
_clear_selection,
inputs=[ids_state],
outputs=[selected_group, export_selected_btn],
show_progress="hidden",
)
# 当用户手动变更选择时,预构建导出 zip 并绑定到按钮
selected_group.change(
_update_export_for_selection,
inputs=[ids_state, selected_group],
outputs=[export_selected_btn],
show_progress="hidden",
)
# Actions: 批量重解析(基于当前图片)
def bulk_reparse(
selected_labels, ids, p_mode, ip, port, minp, maxp, fitz, tick
):
if not ids or not selected_labels:
path_sel = export_selected_rids(ids, selected_labels)
return (
gr.update(value="未选择任何结果。"),
int(tick or 0),
gr.update(value=path_sel),
)
# Map labels -> rids
count = 0
for label in selected_labels:
try:
idx = int(str(label).split()[-1]) - 1
rid = ids[idx]
enqueue_single_reparse(
rid,
None,
p_mode,
ip,
int(port),
int(minp),
int(maxp),
fitz,
)
count += 1
except Exception:
continue
path_sel = export_selected_rids(ids, selected_labels)
return (
gr.update(value=f"已触发 {count} 个重解析任务。"),
int(tick or 0) + 1,
gr.update(value=path_sel),
)
bulk_reparse_btn.click(
bulk_reparse,
inputs=[
selected_group,
ids_state,
prompt_mode,
server_ip,
server_port,
min_pixels,
max_pixels,
fitz_preprocess,
store_tick,
],
outputs=[info_display, store_tick, export_selected_btn],
show_progress="hidden",
)
# Actions: 删除所选(尊重“删除前确认”)
def delete_selected_action(ids, selected_labels, tick):
# 先从“原始 ids 列表”解析出要删除的 rid 列表,避免索引随删除而错位
if not ids or not selected_labels:
choices = [f"Result {i+1}" for i in range(len(ids or []))]
return (
ids,
int(tick or 0),
gr.update(choices=choices, value=[]),
gr.update(value=None),
)
# 解析 label -> index(去重、过滤非法)
sel_indices = []
for label in selected_labels:
try:
idx = int(str(label).split()[-1]) - 1
if 0 <= idx < len(ids):
sel_indices.append(idx)
except Exception:
continue
if not sel_indices:
choices = [f"Result {i+1}" for i in range(len(ids or []))]
return (
ids,
int(tick or 0),
gr.update(choices=choices, value=[]),
gr.update(value=None),
)
sel_indices = sorted(set(sel_indices))
rids_to_delete = [ids[i] for i in sel_indices]
new_ids = list(ids)
new_tick = int(tick or 0)
# 基于 rid 删除,避免受索引变化影响
for rid in rids_to_delete:
new_ids, new_tick = delete_one(new_ids, rid, new_tick)
choices = [f"Result {i+1}" for i in range(len(new_ids or []))]
return (
new_ids,
new_tick,
gr.update(choices=choices, value=[]),
gr.update(value=None),
)
def _on_bulk_delete_click(ids, selected_labels, need_confirm, tick):
if need_confirm:
# 展示确认面板,不改动任何选择与导出
return (
gr.update(visible=True),
ids,
tick,
gr.update(),
gr.update(),
)
# 直接删除并隐藏确认面板
new_ids, new_tick, sel_update, export_update = (
delete_selected_action(ids, selected_labels, tick)
)
return (
gr.update(visible=False),
new_ids,
new_tick,
sel_update,
export_update,
)
delete_selected_btn.click(
_on_bulk_delete_click,
inputs=[
ids_state,
selected_group,
confirm_delete_state,
store_tick,
],
outputs=[
bulk_delete_confirm_panel,
ids_state,
store_tick,
selected_group,
export_selected_btn,
],
show_progress="hidden",
)
def _bulk_confirm_delete(ids, selected_labels, tick):
new_ids, new_tick, sel_update, export_update = (
delete_selected_action(ids, selected_labels, tick)
)
return (
new_ids,
new_tick,
sel_update,
export_update,
gr.update(visible=False),
)
bulk_confirm_delete_btn.click(
_bulk_confirm_delete,
inputs=[ids_state, selected_group, store_tick],
outputs=[
ids_state,
store_tick,
selected_group,
export_selected_btn,
bulk_delete_confirm_panel,
],
show_progress="hidden",
)
bulk_cancel_delete_btn.click(
lambda: gr.update(visible=False),
outputs=[bulk_delete_confirm_panel],
show_progress="hidden",
)
# 进度信息
def update_progress_info(ids, tick, bump):
if not ids:
return (
gr.update(value="Waiting..."),
tick,
int(bump or 0),
)
pending = 0
done = 0
errors = 0
status_signature = []
for rid in ids:
st = RESULTS_CACHE.get(rid, {})
status = st.get("status", "pending")
status_signature.append((rid, status))
if status == "done":
done += 1
elif status == "error":
errors += 1
else:
pending += 1
qsize = TASK_QUEUE.qsize()
running = max(0, pending - qsize)
# Info text
if pending == 0:
info = (
f"进度:完成 {done}"
+ ("" if errors == 0 else f",错误 {errors}")
+ "。"
)
else:
info = f"进度:完成 {done},错误 {errors},正在解析 {running},排队 {qsize},待处理合计 {pending}。"
# Only bump render when any item's status changed
sig_tuple = tuple(status_signature)
last_sig = getattr(update_progress_info, "_last_status_sig", None)
bump_out = int(bump or 0)
if last_sig != sig_tuple:
setattr(update_progress_info, "_last_status_sig", sig_tuple)
bump_out = bump_out + 1
# Only tick when coarse counts change (avoid unnecessary churn)
key = f"{done}_{errors}_{pending}"
last_key = getattr(update_progress_info, "_last_counts_key", None)
new_tick = int(tick or 0)
if last_key != key:
setattr(update_progress_info, "_last_counts_key", key)
new_tick = new_tick + 1
return (
gr.update(value=info),
new_tick,
bump_out,
)
# 计时器不再触达 selected_group,杜绝与用户交互竞争导致选择重置/计时停止
progress_timer.tick(
fn=update_progress_info,
inputs=[ids_state, store_tick, render_bump],
outputs=[info_display, store_tick, render_bump],
show_progress="hidden",
)
# Clear all
def clear_all():
global RESULTS_CACHE
while not TASK_QUEUE.empty():
try:
TASK_QUEUE.get_nowait()
TASK_QUEUE.task_done()
except queue.Empty:
break
RESULTS_CACHE = {}
RETRY_COUNTS.clear()
# Do not stop workers; keep them alive
return (
[],
0,
gr.update(value="Waiting..."),
0,
gr.update(choices=[], value=[]),
gr.update(value=None),
)
clear_btn.click(
clear_all,
inputs=None,
outputs=[
ids_state,
store_tick,
info_display,
render_bump,
selected_group,
export_selected_btn,
],
show_progress="hidden",
)
return demo
# ---------------- main ----------------
def _queue_compat(blocks: gr.Blocks):
"""
Gradio version compatibility layer for Blocks.queue:
- Try Gradio 4.x: default_concurrency_limit + status_update_rate
- Fallback to Gradio 3.x: concurrency_count + status_update_rate
- Final fallback: no-arg queue()
"""
try:
# Gradio 4.x path
return blocks.queue(default_concurrency_limit=20, status_update_rate=0.2)
except TypeError:
try:
# Gradio 3.x path
return blocks.queue(concurrency_count=16, status_update_rate=0.2)
except TypeError:
# Minimal fallback
return blocks.queue()
def _launch_compat(app: gr.Blocks, port: int):
"""
Gradio version compatibility for launch parameters.
"""
try:
app.launch(
server_name="0.0.0.0",
server_port=port,
debug=True,
show_api=False, # 3.x/部分4.x可用
)
except TypeError:
# Fallback without show_api
app.launch(
server_name="0.0.0.0",
server_port=port,
debug=True,
)
if __name__ == "__main__":
import sys
port = int(sys.argv[1]) if len(sys.argv) > 1 else 7860
demo = create_gradio_interface()
app = _queue_compat(demo)
_launch_compat(app, port)