output_cleaner.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623
  1. #!/usr/bin/env python3
  2. """
  3. Data Cleaning Script - Cleans all data using a simplified regex method and saves the results
  4. Features:
  5. 1. Cleans all cases using a simplified regex method.
  6. 2. Saves the cleaned data for each case.
  7. 3. Ensures the relative order of dicts remains unchanged.
  8. 4. Generates a before-and-after cleaning report.
  9. """
  10. import json
  11. import re
  12. import os
  13. from typing import Dict, List, Tuple, Optional, Any
  14. from dataclasses import dataclass
  15. from collections import Counter
  16. import traceback
  17. @dataclass
  18. class CleanedData:
  19. """Data structure for cleaned data"""
  20. case_id: int
  21. original_type: str # 'list' or 'str'
  22. original_length: int
  23. cleaned_data: List[Dict]
  24. cleaning_operations: Dict[str, Any] # Records the cleaning operations performed
  25. success: bool
  26. class OutputCleaner:
  27. """Data Cleaner - Based on a simplified regex method"""
  28. def __init__(self):
  29. # Simplified regular expression patterns
  30. self.dict_pattern = re.compile(r'\{[^{}]*?"bbox"\s*:\s*\[[^\]]*?\][^{}]*?\}', re.DOTALL)
  31. self.bbox_pattern = re.compile(r'"bbox"\s*:\s*\[([^\]]+)\]')
  32. self.missing_delimiter_pattern = re.compile(r'\}\s*\{(?!")')
  33. self.cleaned_results: List[CleanedData] = []
  34. def clean_list_data(self, data: List[Dict], case_id: int) -> CleanedData:
  35. """Cleans list-type data"""
  36. print(f"🔧 Cleaning List data - Case {case_id}")
  37. print(f" Original items: {len(data)}")
  38. cleaned_data = []
  39. operations = {
  40. 'type': 'list',
  41. 'bbox_fixes': 0,
  42. 'removed_items': 0,
  43. 'original_count': len(data)
  44. }
  45. for i, item in enumerate(data):
  46. if not isinstance(item, dict):
  47. operations['removed_items'] += 1
  48. continue
  49. # Check the bbox field
  50. if 'bbox' in item:
  51. bbox = item['bbox']
  52. # Check bbox length - core logic
  53. if isinstance(bbox, list) and len(bbox) == 3:
  54. print(f" ⚠️ Item {i}: bbox has only 3 coordinates. Removing bbox, keeping category and text.")
  55. # Keep only category and text, ensuring order is preserved
  56. new_item = {}
  57. if 'category' in item:
  58. new_item['category'] = item['category']
  59. if 'text' in item:
  60. new_item['text'] = item['text']
  61. if new_item: # Add only if there is valid content
  62. cleaned_data.append(new_item)
  63. operations['bbox_fixes'] += 1
  64. else:
  65. operations['removed_items'] += 1
  66. continue
  67. elif isinstance(bbox, list) and len(bbox) == 4:
  68. # bbox is normal, add directly, preserving original order
  69. cleaned_data.append(item.copy())
  70. continue
  71. else:
  72. print(f" ❌ Item {i}: Abnormal bbox format, skipping.")
  73. operations['removed_items'] += 1
  74. continue
  75. else:
  76. # No bbox field, keep if category exists
  77. if 'category' in item:
  78. cleaned_data.append(item.copy())
  79. continue
  80. else:
  81. operations['removed_items'] += 1
  82. operations['final_count'] = len(cleaned_data)
  83. print(f" ✅ Cleaning complete: {len(cleaned_data)} items, {operations['bbox_fixes']} bbox fixes, {operations['removed_items']} items removed")
  84. return CleanedData(
  85. case_id=case_id,
  86. original_type='list',
  87. original_length=len(data),
  88. cleaned_data=cleaned_data,
  89. cleaning_operations=operations,
  90. success=True
  91. )
  92. def clean_string_data(self, data_str: str, case_id: int) -> CleanedData:
  93. """Cleans string-type data"""
  94. print(f"🔧 Cleaning String data - Case {case_id}")
  95. print(f" Original length: {len(data_str):,}")
  96. operations = {
  97. 'type': 'str',
  98. 'original_length': len(data_str),
  99. 'delimiter_fixes': 0,
  100. 'tail_truncated': False,
  101. 'truncated_length': 0,
  102. 'duplicate_dicts_removed': 0,
  103. 'final_objects': 0
  104. }
  105. try:
  106. # Step 1: Detect and fix missing delimiters
  107. data_str, delimiter_fixes = self._fix_missing_delimiters(data_str)
  108. operations['delimiter_fixes'] = delimiter_fixes
  109. # Step 2: Truncate the last incomplete element
  110. data_str, tail_truncated = self._truncate_last_incomplete_element(data_str)
  111. operations['tail_truncated'] = tail_truncated
  112. operations['truncated_length'] = len(data_str)
  113. # Step 3: Remove duplicate complete dict objects, preserving order
  114. data_str, duplicate_removes = self._remove_duplicate_complete_dicts_preserve_order(data_str)
  115. operations['duplicate_dicts_removed'] = duplicate_removes
  116. # Step 4: Ensure correct JSON format
  117. data_str = self._ensure_json_format(data_str)
  118. # Step 5: Try to parse the final result
  119. final_data = self._parse_final_json(data_str)
  120. if final_data is not None:
  121. operations['final_objects'] = len(final_data)
  122. print(f" ✅ Cleaning complete: {len(final_data)} objects")
  123. return CleanedData(
  124. case_id=case_id,
  125. original_type='str',
  126. original_length=operations['original_length'],
  127. cleaned_data=final_data,
  128. cleaning_operations=operations,
  129. success=True
  130. )
  131. else:
  132. raise Exception("Could not parse the cleaned data")
  133. except Exception as e:
  134. print(f" ❌ Cleaning failed: {e}")
  135. return CleanedData(
  136. case_id=case_id,
  137. original_type='str',
  138. original_length=operations['original_length'],
  139. cleaned_data=[],
  140. cleaning_operations=operations,
  141. success=False
  142. )
  143. def _fix_missing_delimiters(self, text: str) -> Tuple[str, int]:
  144. """Fixes missing delimiters"""
  145. fixes = 0
  146. def replace_delimiter(match):
  147. nonlocal fixes
  148. fixes += 1
  149. return '},{'
  150. text = self.missing_delimiter_pattern.sub(replace_delimiter, text)
  151. if fixes > 0:
  152. print(f" ✅ Fixed {fixes} missing delimiters")
  153. return text, fixes
  154. def _truncate_last_incomplete_element(self, text: str) -> Tuple[str, bool]:
  155. """Truncates the last incomplete element"""
  156. # For very long text (>50k) or text not ending with ']', directly truncate the last '{"bbox":'
  157. needs_truncation = (
  158. len(text) > 50000 or
  159. not text.strip().endswith(']')
  160. )
  161. if needs_truncation:
  162. # Check how many dict objects there are
  163. bbox_count = text.count('{"bbox":')
  164. # If there is only one dict object, do not truncate to avoid deleting the only object
  165. if bbox_count <= 1:
  166. print(f" ⚠️ Only {bbox_count} dict objects found, skipping truncation to avoid deleting all content")
  167. return text, False
  168. # Find the position of the last '{"bbox":'
  169. last_bbox_pos = text.rfind('{"bbox":')
  170. if last_bbox_pos > 0:
  171. # Truncate before this position
  172. truncated_text = text[:last_bbox_pos].rstrip()
  173. # Remove trailing comma
  174. if truncated_text.endswith(','):
  175. truncated_text = truncated_text[:-1]
  176. print(f" ✂️ Truncated the last incomplete element, length reduced from {len(text):,} to {len(truncated_text):,}")
  177. return truncated_text, True
  178. return text, False
  179. def _remove_duplicate_complete_dicts_preserve_order(self, text: str) -> Tuple[str, int]:
  180. """Removes duplicate complete dict objects, preserving original order"""
  181. # Extract all dict objects, preserving order
  182. dict_matches = list(self.dict_pattern.finditer(text))
  183. if not dict_matches:
  184. return text, 0
  185. print(f" 📊 Found {len(dict_matches)} dict objects")
  186. # Deduplication while preserving order: only keep the first occurrence of a dict
  187. unique_dicts = []
  188. seen_dict_strings = set()
  189. total_duplicates = 0
  190. for match in dict_matches:
  191. dict_str = match.group()
  192. if dict_str not in seen_dict_strings:
  193. unique_dicts.append(dict_str)
  194. seen_dict_strings.add(dict_str)
  195. else:
  196. total_duplicates += 1
  197. if total_duplicates > 0:
  198. # Reconstruct the JSON array, preserving the original order
  199. new_text = '[' + ', '.join(unique_dicts) + ']'
  200. print(f" ✅ Removed {total_duplicates} duplicate dicts, keeping {len(unique_dicts)} unique dicts (order preserved)")
  201. return new_text, total_duplicates
  202. else:
  203. print(f" ✅ No duplicate dict objects found")
  204. return text, 0
  205. def _ensure_json_format(self, text: str) -> str:
  206. """Ensures correct JSON format"""
  207. text = text.strip()
  208. if not text.startswith('['):
  209. text = '[' + text
  210. if not text.endswith(']'):
  211. # Remove trailing comma
  212. text = text.rstrip(',').rstrip()
  213. text += ']'
  214. return text
  215. def _parse_final_json(self, text: str) -> Optional[List[Dict]]:
  216. """Tries to parse the final JSON"""
  217. try:
  218. data = json.loads(text)
  219. if isinstance(data, list):
  220. return data
  221. except json.JSONDecodeError as e:
  222. print(f" ❌ JSON parsing failed: {e}")
  223. # fallback1: Extract valid dict objects
  224. valid_dicts = []
  225. for match in self.dict_pattern.finditer(text):
  226. dict_str = match.group()
  227. try:
  228. dict_obj = json.loads(dict_str)
  229. valid_dicts.append(dict_obj)
  230. except:
  231. continue
  232. if valid_dicts:
  233. print(f" ✅ Extracted {len(valid_dicts)} valid dicts")
  234. return valid_dicts
  235. # fallback2: Special handling for a single incomplete dict
  236. return self._handle_single_incomplete_dict(text)
  237. return None
  238. def _handle_single_incomplete_dict(self, text: str) -> Optional[List[Dict]]:
  239. """Handles the special case of a single incomplete dict"""
  240. # Check if it's a single incomplete dict case
  241. if not text.strip().startswith('[{"bbox":'):
  242. return None
  243. try:
  244. # Try to extract bbox coordinates
  245. bbox_match = re.search(r'"bbox"\s*:\s*\[([^\]]+)\]', text)
  246. if not bbox_match:
  247. return None
  248. bbox_str = bbox_match.group(1)
  249. bbox_coords = [int(x.strip()) for x in bbox_str.split(',')]
  250. if len(bbox_coords) != 4:
  251. return None
  252. # Try to extract category
  253. category_match = re.search(r'"category"\s*:\s*"([^"]+)"', text)
  254. category = category_match.group(1) if category_match else "Text"
  255. # Try to extract the beginning of the text (first 10000 characters)
  256. text_match = re.search(r'"text"\s*:\s*"([^"]{0,10000})', text)
  257. if text_match:
  258. text_content = text_match.group(1)
  259. else:
  260. text_content = ""
  261. # Construct the fixed dict
  262. fixed_dict = {
  263. "bbox": bbox_coords,
  264. "category": category
  265. }
  266. if text_content:
  267. fixed_dict["text"] = text_content
  268. print(f" 🔧 Special fix: single incomplete dict → {fixed_dict}")
  269. return [fixed_dict]
  270. except Exception as e:
  271. print(f" ❌ Special fix failed: {e}")
  272. return None
  273. def remove_duplicate_category_text_pairs_and_bbox(self, data_list: List[dict], case_id: int) -> List[dict]:
  274. """Removes duplicate category-text pairs and duplicate bboxes"""
  275. if not data_list or len(data_list) <= 1:
  276. print(f" 📊 Data length {len(data_list)} <= 1, skipping deduplication check")
  277. return data_list
  278. print(f" 📊 Original data length: {len(data_list)}")
  279. # 1. Count occurrences and positions of each category-text pair
  280. category_text_pairs = {}
  281. for i, item in enumerate(data_list):
  282. if isinstance(item, dict) and 'category' in item and 'text' in item:
  283. pair_key = (item.get('category', ''), item.get('text', ''))
  284. if pair_key not in category_text_pairs:
  285. category_text_pairs[pair_key] = []
  286. category_text_pairs[pair_key].append(i)
  287. # 2. Count occurrences and positions of each bbox
  288. bbox_pairs = {}
  289. for i, item in enumerate(data_list):
  290. if isinstance(item, dict) and 'bbox' in item:
  291. bbox = item.get('bbox')
  292. if isinstance(bbox, list) and len(bbox) > 0:
  293. bbox_key = tuple(bbox) # Convert to tuple to use as a dictionary key
  294. if bbox_key not in bbox_pairs:
  295. bbox_pairs[bbox_key] = []
  296. bbox_pairs[bbox_key].append(i)
  297. # 3. Identify items to be removed
  298. duplicates_to_remove = set()
  299. # 3a. Process category-text pairs that appear 5 or more times
  300. for pair_key, positions in category_text_pairs.items():
  301. if len(positions) >= 5:
  302. category, text = pair_key
  303. # Keep the first occurrence, remove subsequent duplicates
  304. positions_to_remove = positions[1:]
  305. duplicates_to_remove.update(positions_to_remove)
  306. print(f" 🔍 Found duplicate category-text pair: category='{category}', first 50 chars of text='{text[:50]}...'")
  307. print(f" Count: {len(positions)}, removing at positions: {positions_to_remove}")
  308. # 3b. Process bboxes that appear 2 or more times
  309. for bbox_key, positions in bbox_pairs.items():
  310. if len(positions) >= 2:
  311. # Keep the first occurrence, remove subsequent duplicates
  312. positions_to_remove = positions[1:]
  313. duplicates_to_remove.update(positions_to_remove)
  314. print(f" 🔍 Found duplicate bbox: {list(bbox_key)}")
  315. print(f" Count: {len(positions)}, removing at positions: {positions_to_remove}")
  316. if not duplicates_to_remove:
  317. print(f" ✅ No category-text pairs or bboxes found exceeding the duplication threshold")
  318. return data_list
  319. # 4. Remove duplicate items from the original data (preserving order)
  320. cleaned_data = []
  321. removed_count = 0
  322. for i, item in enumerate(data_list):
  323. if i not in duplicates_to_remove:
  324. cleaned_data.append(item)
  325. else:
  326. removed_count += 1
  327. print(f" ✅ Deduplication complete: Removed {removed_count} duplicate items")
  328. print(f" 📊 Cleaned data length: {len(cleaned_data)}")
  329. return cleaned_data
  330. def clean_model_output(self, model_output: str):
  331. try:
  332. # Select cleaning method based on data type
  333. if isinstance(model_output, list):
  334. result = self.clean_list_data(model_output, case_id=0)
  335. else:
  336. result = self.clean_string_data(str(model_output), case_id=0)
  337. # Add deduplication step: remove duplicate category-text pairs and bboxes
  338. if result and hasattr(result, 'success') and result.success and result.cleaned_data:
  339. original_data = result.cleaned_data
  340. deduplicated_data = self.remove_duplicate_category_text_pairs_and_bbox(original_data, case_id=0)
  341. # Update the cleaned_data in the CleanedData object
  342. result.cleaned_data = deduplicated_data
  343. return result.cleaned_data
  344. except Exception as e:
  345. print(f"❌ Case cleaning failed: {e}")
  346. return model_output
  347. def clean_all_data(self, jsonl_path: str) -> List[CleanedData]:
  348. """Cleans all data from a JSONL file"""
  349. print(f"🚀 Starting to clean JSONL file: {jsonl_path}")
  350. with open(jsonl_path, 'r', encoding='utf-8') as f:
  351. lines = f.readlines()
  352. datas = []
  353. for i, line in enumerate(lines):
  354. if line.strip():
  355. try:
  356. data = json.loads(line)
  357. predict_field = data.get('predict')
  358. case_id = i + 1
  359. print(f"\n{'='*50}")
  360. print(f"🎯 Cleaning Case {case_id}")
  361. print(f"{'='*50}")
  362. # Select cleaning method based on data type
  363. if isinstance(predict_field, list):
  364. print("📊 Data type: List")
  365. result = self.clean_list_data(predict_field, case_id)
  366. else:
  367. print("📊 Data type: String")
  368. result = self.clean_string_data(str(predict_field), case_id)
  369. # Add deduplication step: remove duplicate category-text pairs and bboxes
  370. if result and hasattr(result, 'success') and result.success and result.cleaned_data:
  371. print("🔄 Checking for and removing duplicate category-text pairs and bboxes...")
  372. original_data = result.cleaned_data
  373. deduplicated_data = self.remove_duplicate_category_text_pairs_and_bbox(original_data, case_id)
  374. # Update the cleaned_data in the CleanedData object
  375. result.cleaned_data = deduplicated_data
  376. data['predict_resized'] = result.cleaned_data
  377. datas.append(data)
  378. self.cleaned_results.append(result)
  379. except Exception as e:
  380. print(f"❌ Case {i+1} cleaning failed: {e}")
  381. traceback.print_exc()
  382. save_path = jsonl_path.replace('.jsonl', '_filtered.jsonl')
  383. with open(save_path, 'w') as w:
  384. for data in datas:
  385. w.write(json.dumps(data, ensure_ascii=False) + '\n')
  386. print(f"✅ Saved cleaned data to: {save_path}")
  387. return self.cleaned_results
  388. def save_cleaned_data(self, output_dir: str):
  389. """Saves the cleaned data"""
  390. print(f"\n💾 Saving cleaned data to: {output_dir}")
  391. os.makedirs(output_dir, exist_ok=True)
  392. # 1. Save cleaned data for each case
  393. for result in self.cleaned_results:
  394. case_filename = f"cleaned_case_{result.case_id:02d}.json"
  395. case_filepath = os.path.join(output_dir, case_filename)
  396. # Save the cleaned data
  397. with open(case_filepath, 'w', encoding='utf-8') as f:
  398. json.dump(result.cleaned_data, f, ensure_ascii=False, indent=2)
  399. print(f" ✅ Case {result.case_id}: {len(result.cleaned_data)} objects → {case_filename}")
  400. # 2. Save all cleaned data to a single file
  401. all_cleaned_data = []
  402. for result in self.cleaned_results:
  403. all_cleaned_data.append({
  404. 'case_id': result.case_id,
  405. 'original_type': result.original_type,
  406. 'original_length': result.original_length,
  407. 'cleaned_objects_count': len(result.cleaned_data),
  408. 'success': result.success,
  409. 'cleaning_operations': result.cleaning_operations,
  410. 'cleaned_data': result.cleaned_data
  411. })
  412. all_data_filepath = os.path.join(output_dir, "all_cleaned_data.json")
  413. with open(all_data_filepath, 'w', encoding='utf-8') as f:
  414. json.dump(all_cleaned_data, f, ensure_ascii=False, indent=2)
  415. print(f" 📁 All data: {len(all_cleaned_data)} cases → all_cleaned_data.json")
  416. # 3. Generate a cleaning report
  417. self._generate_cleaning_report(output_dir)
  418. def _generate_cleaning_report(self, output_dir: str):
  419. """Generates a cleaning report"""
  420. report = []
  421. report.append("📊 Data Cleaning Report")
  422. report.append("=" * 60)
  423. import datetime
  424. report.append(f"Processing Time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  425. report.append("")
  426. # Overall statistics
  427. total_cases = len(self.cleaned_results)
  428. successful_cases = sum(1 for r in self.cleaned_results if r.success)
  429. total_objects = sum(len(r.cleaned_data) for r in self.cleaned_results)
  430. report.append("📈 Overall Statistics:")
  431. report.append(f" Total Cases: {total_cases}")
  432. report.append(f" Successfully Cleaned: {successful_cases}")
  433. report.append(f" Success Rate: {successful_cases/total_cases*100:.1f}%")
  434. report.append(f" Total Recovered Objects: {total_objects}")
  435. report.append("")
  436. # Detailed statistics
  437. list_results = [r for r in self.cleaned_results if r.original_type == 'list']
  438. str_results = [r for r in self.cleaned_results if r.original_type == 'str']
  439. if list_results:
  440. report.append("📋 List Type Cleaning Statistics:")
  441. for r in list_results:
  442. ops = r.cleaning_operations
  443. report.append(f" Case {r.case_id}: {ops['original_count']} → {ops['final_count']} objects")
  444. if ops['bbox_fixes'] > 0:
  445. report.append(f" - bbox fixes: {ops['bbox_fixes']}")
  446. if ops['removed_items'] > 0:
  447. report.append(f" - invalid items removed: {ops['removed_items']}")
  448. report.append("")
  449. if str_results:
  450. report.append("📝 String Type Cleaning Statistics:")
  451. for r in str_results:
  452. ops = r.cleaning_operations
  453. status = "✅" if r.success else "❌"
  454. report.append(f" Case {r.case_id} {status}: {ops['original_length']:,} chars → {ops['final_objects']} objects")
  455. details = []
  456. if ops['delimiter_fixes'] > 0:
  457. details.append(f"Delimiter fixes: {ops['delimiter_fixes']}")
  458. if ops['tail_truncated']:
  459. reduction = ops['original_length'] - ops['truncated_length']
  460. details.append(f"Tail truncation: -{reduction:,} chars")
  461. if ops['duplicate_dicts_removed'] > 0:
  462. details.append(f"Duplicates removed: {ops['duplicate_dicts_removed']}")
  463. if details:
  464. report.append(f" - {', '.join(details)}")
  465. report.append("")
  466. # Note on data order
  467. report.append("🔄 Data Order Guarantee:")
  468. report.append(" ✅ The relative order of all dict objects is preserved during cleaning.")
  469. report.append(" ✅ When deduplicating, the first occurrence of a dict is kept, and subsequent duplicates are removed.")
  470. report.append(" ✅ The order of items in List-type data is fully preserved.")
  471. # Save the report
  472. report_filepath = os.path.join(output_dir, "cleaning_report.txt")
  473. with open(report_filepath, 'w', encoding='utf-8') as f:
  474. f.write('\n'.join(report))
  475. print(f" 📋 Cleaning report: cleaning_report.txt")
  476. # Also print to console
  477. print(f"\n{chr(10).join(report)}")
  478. def main():
  479. """Main function"""
  480. # Create a data cleaner instance
  481. cleaner = OutputCleaner()
  482. # Input file
  483. jsonl_path = "output_with_failcase.jsonl"
  484. # Output directory
  485. output_dir = "output_with_failcase_cleaned"
  486. # Clean all data
  487. results = cleaner.clean_all_data(jsonl_path)
  488. # Save the cleaned data
  489. cleaner.save_cleaned_data(output_dir)
  490. print(f"\n🎉 Data cleaning complete!")
  491. print(f"📁 Cleaned data saved in: {output_dir}")
  492. if __name__ == "__main__":
  493. main()