#!/usr/bin/env python3 """ Data Cleaning Script - Cleans all data using a simplified regex method and saves the results Features: 1. Cleans all cases using a simplified regex method. 2. Saves the cleaned data for each case. 3. Ensures the relative order of dicts remains unchanged. 4. Generates a before-and-after cleaning report. """ import json import re import os from typing import Dict, List, Tuple, Optional, Any from dataclasses import dataclass from collections import Counter import traceback @dataclass class CleanedData: """Data structure for cleaned data""" case_id: int original_type: str # 'list' or 'str' original_length: int cleaned_data: List[Dict] cleaning_operations: Dict[str, Any] # Records the cleaning operations performed success: bool class OutputCleaner: """Data Cleaner - Based on a simplified regex method""" def __init__(self): # Simplified regular expression patterns self.dict_pattern = re.compile(r'\{[^{}]*?"bbox"\s*:\s*\[[^\]]*?\][^{}]*?\}', re.DOTALL) self.bbox_pattern = re.compile(r'"bbox"\s*:\s*\[([^\]]+)\]') self.missing_delimiter_pattern = re.compile(r'\}\s*\{(?!")') self.cleaned_results: List[CleanedData] = [] def clean_list_data(self, data: List[Dict], case_id: int) -> CleanedData: """Cleans list-type data""" print(f"šŸ”§ Cleaning List data - Case {case_id}") print(f" Original items: {len(data)}") cleaned_data = [] operations = { 'type': 'list', 'bbox_fixes': 0, 'removed_items': 0, 'original_count': len(data) } for i, item in enumerate(data): if not isinstance(item, dict): operations['removed_items'] += 1 continue # Check the bbox field if 'bbox' in item: bbox = item['bbox'] # Check bbox length - core logic if isinstance(bbox, list) and len(bbox) == 3: print(f" āš ļø Item {i}: bbox has only 3 coordinates. Removing bbox, keeping category and text.") # Keep only category and text, ensuring order is preserved new_item = {} if 'category' in item: new_item['category'] = item['category'] if 'text' in item: new_item['text'] = item['text'] if new_item: # Add only if there is valid content cleaned_data.append(new_item) operations['bbox_fixes'] += 1 else: operations['removed_items'] += 1 continue elif isinstance(bbox, list) and len(bbox) == 4: # bbox is normal, add directly, preserving original order cleaned_data.append(item.copy()) continue else: print(f" āŒ Item {i}: Abnormal bbox format, skipping.") operations['removed_items'] += 1 continue else: # No bbox field, keep if category exists if 'category' in item: cleaned_data.append(item.copy()) continue else: operations['removed_items'] += 1 operations['final_count'] = len(cleaned_data) print(f" āœ… Cleaning complete: {len(cleaned_data)} items, {operations['bbox_fixes']} bbox fixes, {operations['removed_items']} items removed") return CleanedData( case_id=case_id, original_type='list', original_length=len(data), cleaned_data=cleaned_data, cleaning_operations=operations, success=True ) def clean_string_data(self, data_str: str, case_id: int) -> CleanedData: """Cleans string-type data""" print(f"šŸ”§ Cleaning String data - Case {case_id}") print(f" Original length: {len(data_str):,}") operations = { 'type': 'str', 'original_length': len(data_str), 'delimiter_fixes': 0, 'tail_truncated': False, 'truncated_length': 0, 'duplicate_dicts_removed': 0, 'final_objects': 0 } try: # Step 1: Detect and fix missing delimiters data_str, delimiter_fixes = self._fix_missing_delimiters(data_str) operations['delimiter_fixes'] = delimiter_fixes # Step 2: Truncate the last incomplete element data_str, tail_truncated = self._truncate_last_incomplete_element(data_str) operations['tail_truncated'] = tail_truncated operations['truncated_length'] = len(data_str) # Step 3: Remove duplicate complete dict objects, preserving order data_str, duplicate_removes = self._remove_duplicate_complete_dicts_preserve_order(data_str) operations['duplicate_dicts_removed'] = duplicate_removes # Step 4: Ensure correct JSON format data_str = self._ensure_json_format(data_str) # Step 5: Try to parse the final result final_data = self._parse_final_json(data_str) if final_data is not None: operations['final_objects'] = len(final_data) print(f" āœ… Cleaning complete: {len(final_data)} objects") return CleanedData( case_id=case_id, original_type='str', original_length=operations['original_length'], cleaned_data=final_data, cleaning_operations=operations, success=True ) else: raise Exception("Could not parse the cleaned data") except Exception as e: print(f" āŒ Cleaning failed: {e}") return CleanedData( case_id=case_id, original_type='str', original_length=operations['original_length'], cleaned_data=[], cleaning_operations=operations, success=False ) def _fix_missing_delimiters(self, text: str) -> Tuple[str, int]: """Fixes missing delimiters""" fixes = 0 def replace_delimiter(match): nonlocal fixes fixes += 1 return '},{' text = self.missing_delimiter_pattern.sub(replace_delimiter, text) if fixes > 0: print(f" āœ… Fixed {fixes} missing delimiters") return text, fixes def _truncate_last_incomplete_element(self, text: str) -> Tuple[str, bool]: """Truncates the last incomplete element""" # For very long text (>50k) or text not ending with ']', directly truncate the last '{"bbox":' needs_truncation = ( len(text) > 50000 or not text.strip().endswith(']') ) if needs_truncation: # Check how many dict objects there are bbox_count = text.count('{"bbox":') # If there is only one dict object, do not truncate to avoid deleting the only object if bbox_count <= 1: print(f" āš ļø Only {bbox_count} dict objects found, skipping truncation to avoid deleting all content") return text, False # Find the position of the last '{"bbox":' last_bbox_pos = text.rfind('{"bbox":') if last_bbox_pos > 0: # Truncate before this position truncated_text = text[:last_bbox_pos].rstrip() # Remove trailing comma if truncated_text.endswith(','): truncated_text = truncated_text[:-1] print(f" āœ‚ļø Truncated the last incomplete element, length reduced from {len(text):,} to {len(truncated_text):,}") return truncated_text, True return text, False def _remove_duplicate_complete_dicts_preserve_order(self, text: str) -> Tuple[str, int]: """Removes duplicate complete dict objects, preserving original order""" # Extract all dict objects, preserving order dict_matches = list(self.dict_pattern.finditer(text)) if not dict_matches: return text, 0 print(f" šŸ“Š Found {len(dict_matches)} dict objects") # Deduplication while preserving order: only keep the first occurrence of a dict unique_dicts = [] seen_dict_strings = set() total_duplicates = 0 for match in dict_matches: dict_str = match.group() if dict_str not in seen_dict_strings: unique_dicts.append(dict_str) seen_dict_strings.add(dict_str) else: total_duplicates += 1 if total_duplicates > 0: # Reconstruct the JSON array, preserving the original order new_text = '[' + ', '.join(unique_dicts) + ']' print(f" āœ… Removed {total_duplicates} duplicate dicts, keeping {len(unique_dicts)} unique dicts (order preserved)") return new_text, total_duplicates else: print(f" āœ… No duplicate dict objects found") return text, 0 def _ensure_json_format(self, text: str) -> str: """Ensures correct JSON format""" text = text.strip() if not text.startswith('['): text = '[' + text if not text.endswith(']'): # Remove trailing comma text = text.rstrip(',').rstrip() text += ']' return text def _parse_final_json(self, text: str) -> Optional[List[Dict]]: """Tries to parse the final JSON""" try: data = json.loads(text) if isinstance(data, list): return data except json.JSONDecodeError as e: print(f" āŒ JSON parsing failed: {e}") # fallback1: Extract valid dict objects valid_dicts = [] for match in self.dict_pattern.finditer(text): dict_str = match.group() try: dict_obj = json.loads(dict_str) valid_dicts.append(dict_obj) except: continue if valid_dicts: print(f" āœ… Extracted {len(valid_dicts)} valid dicts") return valid_dicts # fallback2: Special handling for a single incomplete dict return self._handle_single_incomplete_dict(text) return None def _handle_single_incomplete_dict(self, text: str) -> Optional[List[Dict]]: """Handles the special case of a single incomplete dict""" # Check if it's a single incomplete dict case if not text.strip().startswith('[{"bbox":'): return None try: # Try to extract bbox coordinates bbox_match = re.search(r'"bbox"\s*:\s*\[([^\]]+)\]', text) if not bbox_match: return None bbox_str = bbox_match.group(1) bbox_coords = [int(x.strip()) for x in bbox_str.split(',')] if len(bbox_coords) != 4: return None # Try to extract category category_match = re.search(r'"category"\s*:\s*"([^"]+)"', text) category = category_match.group(1) if category_match else "Text" # Try to extract the beginning of the text (first 10000 characters) text_match = re.search(r'"text"\s*:\s*"([^"]{0,10000})', text) if text_match: text_content = text_match.group(1) else: text_content = "" # Construct the fixed dict fixed_dict = { "bbox": bbox_coords, "category": category } if text_content: fixed_dict["text"] = text_content print(f" šŸ”§ Special fix: single incomplete dict → {fixed_dict}") return [fixed_dict] except Exception as e: print(f" āŒ Special fix failed: {e}") return None def remove_duplicate_category_text_pairs_and_bbox(self, data_list: List[dict], case_id: int) -> List[dict]: """Removes duplicate category-text pairs and duplicate bboxes""" if not data_list or len(data_list) <= 1: print(f" šŸ“Š Data length {len(data_list)} <= 1, skipping deduplication check") return data_list print(f" šŸ“Š Original data length: {len(data_list)}") # 1. Count occurrences and positions of each category-text pair category_text_pairs = {} for i, item in enumerate(data_list): if isinstance(item, dict) and 'category' in item and 'text' in item: pair_key = (item.get('category', ''), item.get('text', '')) if pair_key not in category_text_pairs: category_text_pairs[pair_key] = [] category_text_pairs[pair_key].append(i) # 2. Count occurrences and positions of each bbox bbox_pairs = {} for i, item in enumerate(data_list): if isinstance(item, dict) and 'bbox' in item: bbox = item.get('bbox') if isinstance(bbox, list) and len(bbox) > 0: bbox_key = tuple(bbox) # Convert to tuple to use as a dictionary key if bbox_key not in bbox_pairs: bbox_pairs[bbox_key] = [] bbox_pairs[bbox_key].append(i) # 3. Identify items to be removed duplicates_to_remove = set() # 3a. Process category-text pairs that appear 5 or more times for pair_key, positions in category_text_pairs.items(): if len(positions) >= 5: category, text = pair_key # Keep the first occurrence, remove subsequent duplicates positions_to_remove = positions[1:] duplicates_to_remove.update(positions_to_remove) print(f" šŸ” Found duplicate category-text pair: category='{category}', first 50 chars of text='{text[:50]}...'") print(f" Count: {len(positions)}, removing at positions: {positions_to_remove}") # 3b. Process bboxes that appear 2 or more times for bbox_key, positions in bbox_pairs.items(): if len(positions) >= 2: # Keep the first occurrence, remove subsequent duplicates positions_to_remove = positions[1:] duplicates_to_remove.update(positions_to_remove) print(f" šŸ” Found duplicate bbox: {list(bbox_key)}") print(f" Count: {len(positions)}, removing at positions: {positions_to_remove}") if not duplicates_to_remove: print(f" āœ… No category-text pairs or bboxes found exceeding the duplication threshold") return data_list # 4. Remove duplicate items from the original data (preserving order) cleaned_data = [] removed_count = 0 for i, item in enumerate(data_list): if i not in duplicates_to_remove: cleaned_data.append(item) else: removed_count += 1 print(f" āœ… Deduplication complete: Removed {removed_count} duplicate items") print(f" šŸ“Š Cleaned data length: {len(cleaned_data)}") return cleaned_data def clean_model_output(self, model_output: str): try: # Select cleaning method based on data type if isinstance(model_output, list): result = self.clean_list_data(model_output, case_id=0) else: result = self.clean_string_data(str(model_output), case_id=0) # Add deduplication step: remove duplicate category-text pairs and bboxes if result and hasattr(result, 'success') and result.success and result.cleaned_data: original_data = result.cleaned_data deduplicated_data = self.remove_duplicate_category_text_pairs_and_bbox(original_data, case_id=0) # Update the cleaned_data in the CleanedData object result.cleaned_data = deduplicated_data return result.cleaned_data except Exception as e: print(f"āŒ Case cleaning failed: {e}") return model_output def clean_all_data(self, jsonl_path: str) -> List[CleanedData]: """Cleans all data from a JSONL file""" print(f"šŸš€ Starting to clean JSONL file: {jsonl_path}") with open(jsonl_path, 'r', encoding='utf-8') as f: lines = f.readlines() datas = [] for i, line in enumerate(lines): if line.strip(): try: data = json.loads(line) predict_field = data.get('predict') case_id = i + 1 print(f"\n{'='*50}") print(f"šŸŽÆ Cleaning Case {case_id}") print(f"{'='*50}") # Select cleaning method based on data type if isinstance(predict_field, list): print("šŸ“Š Data type: List") result = self.clean_list_data(predict_field, case_id) else: print("šŸ“Š Data type: String") result = self.clean_string_data(str(predict_field), case_id) # Add deduplication step: remove duplicate category-text pairs and bboxes if result and hasattr(result, 'success') and result.success and result.cleaned_data: print("šŸ”„ Checking for and removing duplicate category-text pairs and bboxes...") original_data = result.cleaned_data deduplicated_data = self.remove_duplicate_category_text_pairs_and_bbox(original_data, case_id) # Update the cleaned_data in the CleanedData object result.cleaned_data = deduplicated_data data['predict_resized'] = result.cleaned_data datas.append(data) self.cleaned_results.append(result) except Exception as e: print(f"āŒ Case {i+1} cleaning failed: {e}") traceback.print_exc() save_path = jsonl_path.replace('.jsonl', '_filtered.jsonl') with open(save_path, 'w') as w: for data in datas: w.write(json.dumps(data, ensure_ascii=False) + '\n') print(f"āœ… Saved cleaned data to: {save_path}") return self.cleaned_results def save_cleaned_data(self, output_dir: str): """Saves the cleaned data""" print(f"\nšŸ’¾ Saving cleaned data to: {output_dir}") os.makedirs(output_dir, exist_ok=True) # 1. Save cleaned data for each case for result in self.cleaned_results: case_filename = f"cleaned_case_{result.case_id:02d}.json" case_filepath = os.path.join(output_dir, case_filename) # Save the cleaned data with open(case_filepath, 'w', encoding='utf-8') as f: json.dump(result.cleaned_data, f, ensure_ascii=False, indent=2) print(f" āœ… Case {result.case_id}: {len(result.cleaned_data)} objects → {case_filename}") # 2. Save all cleaned data to a single file all_cleaned_data = [] for result in self.cleaned_results: all_cleaned_data.append({ 'case_id': result.case_id, 'original_type': result.original_type, 'original_length': result.original_length, 'cleaned_objects_count': len(result.cleaned_data), 'success': result.success, 'cleaning_operations': result.cleaning_operations, 'cleaned_data': result.cleaned_data }) all_data_filepath = os.path.join(output_dir, "all_cleaned_data.json") with open(all_data_filepath, 'w', encoding='utf-8') as f: json.dump(all_cleaned_data, f, ensure_ascii=False, indent=2) print(f" šŸ“ All data: {len(all_cleaned_data)} cases → all_cleaned_data.json") # 3. Generate a cleaning report self._generate_cleaning_report(output_dir) def _generate_cleaning_report(self, output_dir: str): """Generates a cleaning report""" report = [] report.append("šŸ“Š Data Cleaning Report") report.append("=" * 60) import datetime report.append(f"Processing Time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") report.append("") # Overall statistics total_cases = len(self.cleaned_results) successful_cases = sum(1 for r in self.cleaned_results if r.success) total_objects = sum(len(r.cleaned_data) for r in self.cleaned_results) report.append("šŸ“ˆ Overall Statistics:") report.append(f" Total Cases: {total_cases}") report.append(f" Successfully Cleaned: {successful_cases}") report.append(f" Success Rate: {successful_cases/total_cases*100:.1f}%") report.append(f" Total Recovered Objects: {total_objects}") report.append("") # Detailed statistics list_results = [r for r in self.cleaned_results if r.original_type == 'list'] str_results = [r for r in self.cleaned_results if r.original_type == 'str'] if list_results: report.append("šŸ“‹ List Type Cleaning Statistics:") for r in list_results: ops = r.cleaning_operations report.append(f" Case {r.case_id}: {ops['original_count']} → {ops['final_count']} objects") if ops['bbox_fixes'] > 0: report.append(f" - bbox fixes: {ops['bbox_fixes']}") if ops['removed_items'] > 0: report.append(f" - invalid items removed: {ops['removed_items']}") report.append("") if str_results: report.append("šŸ“ String Type Cleaning Statistics:") for r in str_results: ops = r.cleaning_operations status = "āœ…" if r.success else "āŒ" report.append(f" Case {r.case_id} {status}: {ops['original_length']:,} chars → {ops['final_objects']} objects") details = [] if ops['delimiter_fixes'] > 0: details.append(f"Delimiter fixes: {ops['delimiter_fixes']}") if ops['tail_truncated']: reduction = ops['original_length'] - ops['truncated_length'] details.append(f"Tail truncation: -{reduction:,} chars") if ops['duplicate_dicts_removed'] > 0: details.append(f"Duplicates removed: {ops['duplicate_dicts_removed']}") if details: report.append(f" - {', '.join(details)}") report.append("") # Note on data order report.append("šŸ”„ Data Order Guarantee:") report.append(" āœ… The relative order of all dict objects is preserved during cleaning.") report.append(" āœ… When deduplicating, the first occurrence of a dict is kept, and subsequent duplicates are removed.") report.append(" āœ… The order of items in List-type data is fully preserved.") # Save the report report_filepath = os.path.join(output_dir, "cleaning_report.txt") with open(report_filepath, 'w', encoding='utf-8') as f: f.write('\n'.join(report)) print(f" šŸ“‹ Cleaning report: cleaning_report.txt") # Also print to console print(f"\n{chr(10).join(report)}") def main(): """Main function""" # Create a data cleaner instance cleaner = OutputCleaner() # Input file jsonl_path = "output_with_failcase.jsonl" # Output directory output_dir = "output_with_failcase_cleaned" # Clean all data results = cleaner.clean_all_data(jsonl_path) # Save the cleaned data cleaner.save_cleaned_data(output_dir) print(f"\nšŸŽ‰ Data cleaning complete!") print(f"šŸ“ Cleaned data saved in: {output_dir}") if __name__ == "__main__": main()