| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546 |
- import os
- import sys
- import json
- import re
- import math
- import unicodedata
- from collections import Counter
- import numpy as np
- from termcolor import cprint
- from magic_pdf.libs import fitz
- from magic_pdf.libs import NLPModels
- if sys.version_info[0] >= 3:
- sys.stdout.reconfigure(encoding="utf-8") # type: ignore
- def open_pdf(pdf_path):
- try:
- pdf_document = fitz.open(pdf_path) # type: ignore
- return pdf_document
- except Exception as e:
- print(f"无法打开PDF文件:{pdf_path}。原因是:{e}")
- raise e
- def print_green_on_red(text):
- cprint(text, "green", "on_red", attrs=["bold"], end="\n\n")
- def print_green(text):
- print()
- cprint(text, "green", attrs=["bold"], end="\n\n")
- def print_red(text):
- print()
- cprint(text, "red", attrs=["bold"], end="\n\n")
- def print_yellow(text):
- print()
- cprint(text, "yellow", attrs=["bold"], end="\n\n")
- def safe_get(dict_obj, key, default):
- val = dict_obj.get(key)
- if val is None:
- return default
- else:
- return val
- def is_bbox_overlap(bbox1, bbox2):
- """
- This function checks if bbox1 and bbox2 overlap or not
- Parameters
- ----------
- bbox1 : list
- bbox1
- bbox2 : list
- bbox2
- Returns
- -------
- bool
- True if bbox1 and bbox2 overlap, else False
- """
- x0_1, y0_1, x1_1, y1_1 = bbox1
- x0_2, y0_2, x1_2, y1_2 = bbox2
- if x0_1 > x1_2 or x0_2 > x1_1:
- return False
- if y0_1 > y1_2 or y0_2 > y1_1:
- return False
- return True
- def is_in_bbox(bbox1, bbox2):
- """
- This function checks if bbox1 is in bbox2
- Parameters
- ----------
- bbox1 : list
- bbox1
- bbox2 : list
- bbox2
- Returns
- -------
- bool
- True if bbox1 is in bbox2, else False
- """
- x0_1, y0_1, x1_1, y1_1 = bbox1
- x0_2, y0_2, x1_2, y1_2 = bbox2
- if x0_1 >= x0_2 and y0_1 >= y0_2 and x1_1 <= x1_2 and y1_1 <= y1_2:
- return True
- else:
- return False
- def calculate_para_bbox(lines):
- """
- This function calculates the minimum bbox of the paragraph
- Parameters
- ----------
- lines : list
- lines
- Returns
- -------
- para_bbox : list
- bbox of the paragraph
- """
- x0 = min(line["bbox"][0] for line in lines)
- y0 = min(line["bbox"][1] for line in lines)
- x1 = max(line["bbox"][2] for line in lines)
- y1 = max(line["bbox"][3] for line in lines)
- return [x0, y0, x1, y1]
- def is_line_right_aligned_from_neighbors(curr_line_bbox, prev_line_bbox, next_line_bbox, avg_char_width, direction=2):
- """
- This function checks if the line is right aligned from its neighbors
- Parameters
- ----------
- curr_line_bbox : list
- bbox of the current line
- prev_line_bbox : list
- bbox of the previous line
- next_line_bbox : list
- bbox of the next line
- avg_char_width : float
- average of char widths
- direction : int
- 0 for prev, 1 for next, 2 for both
- Returns
- -------
- bool
- True if the line is right aligned from its neighbors, False otherwise.
- """
- horizontal_ratio = 0.5
- horizontal_thres = horizontal_ratio * avg_char_width
- _, _, x1, _ = curr_line_bbox
- _, _, prev_x1, _ = prev_line_bbox if prev_line_bbox else (0, 0, 0, 0)
- _, _, next_x1, _ = next_line_bbox if next_line_bbox else (0, 0, 0, 0)
- if direction == 0:
- return abs(x1 - prev_x1) < horizontal_thres
- elif direction == 1:
- return abs(x1 - next_x1) < horizontal_thres
- elif direction == 2:
- return abs(x1 - prev_x1) < horizontal_thres and abs(x1 - next_x1) < horizontal_thres
- else:
- return False
- def is_line_left_aligned_from_neighbors(curr_line_bbox, prev_line_bbox, next_line_bbox, avg_char_width, direction=2):
- """
- This function checks if the line is left aligned from its neighbors
- Parameters
- ----------
- curr_line_bbox : list
- bbox of the current line
- prev_line_bbox : list
- bbox of the previous line
- next_line_bbox : list
- bbox of the next line
- avg_char_width : float
- average of char widths
- direction : int
- 0 for prev, 1 for next, 2 for both
- Returns
- -------
- bool
- True if the line is left aligned from its neighbors, False otherwise.
- """
- horizontal_ratio = 0.5
- horizontal_thres = horizontal_ratio * avg_char_width
- x0, _, _, _ = curr_line_bbox
- prev_x0, _, _, _ = prev_line_bbox if prev_line_bbox else (0, 0, 0, 0)
- next_x0, _, _, _ = next_line_bbox if next_line_bbox else (0, 0, 0, 0)
- if direction == 0:
- return abs(x0 - prev_x0) < horizontal_thres
- elif direction == 1:
- return abs(x0 - next_x0) < horizontal_thres
- elif direction == 2:
- return abs(x0 - prev_x0) < horizontal_thres and abs(x0 - next_x0) < horizontal_thres
- else:
- return False
- def end_with_punctuation(line_text):
- """
- This function checks if the line ends with punctuation marks
- """
- english_end_puncs = [".", "?", "!"]
- chinese_end_puncs = ["。", "?", "!"]
- end_puncs = english_end_puncs + chinese_end_puncs
- last_non_space_char = None
- for ch in line_text[::-1]:
- if not ch.isspace():
- last_non_space_char = ch
- break
- if last_non_space_char is None:
- return False
- return last_non_space_char in end_puncs
- def is_nested_list(lst):
- if isinstance(lst, list):
- return any(isinstance(sub, list) for sub in lst)
- return False
- class DenseSingleLineBlockException(Exception):
- """
- This class defines the exception type for dense single line-block.
- """
- def __init__(self, message="DenseSingleLineBlockException"):
- self.message = message
- super().__init__(self.message)
- def __str__(self):
- return f"{self.message}"
- def __repr__(self):
- return f"{self.message}"
- class TitleDetectionException(Exception):
- """
- This class defines the exception type for title detection.
- """
- def __init__(self, message="TitleDetectionException"):
- self.message = message
- super().__init__(self.message)
- def __str__(self):
- return f"{self.message}"
- def __repr__(self):
- return f"{self.message}"
- class TitleLevelException(Exception):
- """
- This class defines the exception type for title level.
- """
- def __init__(self, message="TitleLevelException"):
- self.message = message
- super().__init__(self.message)
- def __str__(self):
- return f"{self.message}"
- def __repr__(self):
- return f"{self.message}"
- class ParaSplitException(Exception):
- """
- This class defines the exception type for paragraph splitting.
- """
- def __init__(self, message="ParaSplitException"):
- self.message = message
- super().__init__(self.message)
- def __str__(self):
- return f"{self.message}"
- def __repr__(self):
- return f"{self.message}"
- class ParaMergeException(Exception):
- """
- This class defines the exception type for paragraph merging.
- """
- def __init__(self, message="ParaMergeException"):
- self.message = message
- super().__init__(self.message)
- def __str__(self):
- return f"{self.message}"
- def __repr__(self):
- return f"{self.message}"
- class DiscardByException:
- """
- This class discards pdf files by exception
- """
- def __init__(self) -> None:
- pass
- def discard_by_single_line_block(self, pdf_dic, exception: DenseSingleLineBlockException):
- """
- This function discards pdf files by single line block exception
- Parameters
- ----------
- pdf_dic : dict
- pdf dictionary
- exception : str
- exception message
- Returns
- -------
- error_message : str
- """
- exception_page_nums = 0
- page_num = 0
- for page_id, page in pdf_dic.items():
- if page_id.startswith("page_"):
- page_num += 1
- if "preproc_blocks" in page.keys():
- preproc_blocks = page["preproc_blocks"]
- all_single_line_blocks = []
- for block in preproc_blocks:
- if len(block["lines"]) == 1:
- all_single_line_blocks.append(block)
- if len(preproc_blocks) > 0 and len(all_single_line_blocks) / len(preproc_blocks) > 0.9:
- exception_page_nums += 1
- if page_num == 0:
- return None
- if exception_page_nums / page_num > 0.1: # Low ratio means basically, whenever this is the case, it is discarded
- return exception.message
- return None
- def discard_by_title_detection(self, pdf_dic, exception: TitleDetectionException):
- """
- This function discards pdf files by title detection exception
- Parameters
- ----------
- pdf_dic : dict
- pdf dictionary
- exception : str
- exception message
- Returns
- -------
- error_message : str
- """
- # return exception.message
- return None
- def discard_by_title_level(self, pdf_dic, exception: TitleLevelException):
- """
- This function discards pdf files by title level exception
- Parameters
- ----------
- pdf_dic : dict
- pdf dictionary
- exception : str
- exception message
- Returns
- -------
- error_message : str
- """
- # return exception.message
- return None
- def discard_by_split_para(self, pdf_dic, exception: ParaSplitException):
- """
- This function discards pdf files by split para exception
- Parameters
- ----------
- pdf_dic : dict
- pdf dictionary
- exception : str
- exception message
- Returns
- -------
- error_message : str
- """
- # return exception.message
- return None
- def discard_by_merge_para(self, pdf_dic, exception: ParaMergeException):
- """
- This function discards pdf files by merge para exception
- Parameters
- ----------
- pdf_dic : dict
- pdf dictionary
- exception : str
- exception message
- Returns
- -------
- error_message : str
- """
- # return exception.message
- return None
- class LayoutFilterProcessor:
- def __init__(self) -> None:
- pass
- def batch_process_blocks(self, pdf_dict):
- """
- This function processes the blocks in batch.
- Parameters
- ----------
- self : object
- The instance of the class.
- pdf_dict : dict
- pdf dictionary
- Returns
- -------
- pdf_dict : dict
- pdf dictionary
- """
- for page_id, blocks in pdf_dict.items():
- if page_id.startswith("page_"):
- if "layout_bboxes" in blocks.keys() and "para_blocks" in blocks.keys():
- layout_bbox_objs = blocks["layout_bboxes"]
- if layout_bbox_objs is None:
- continue
- layout_bboxes = [bbox_obj["layout_bbox"] for bbox_obj in layout_bbox_objs]
- # Enlarge each value of x0, y0, x1, y1 for each layout_bbox to prevent loss of text.
- layout_bboxes = [
- [math.ceil(x0), math.ceil(y0), math.ceil(x1), math.ceil(y1)] for x0, y0, x1, y1 in layout_bboxes
- ]
- para_blocks = blocks["para_blocks"]
- if para_blocks is None:
- continue
- for lb_bbox in layout_bboxes:
- for i, para_block in enumerate(para_blocks):
- para_bbox = para_block["bbox"]
- para_blocks[i]["in_layout"] = 0
- if is_in_bbox(para_bbox, lb_bbox):
- para_blocks[i]["in_layout"] = 1
- blocks["para_blocks"] = para_blocks
- return pdf_dict
- class RawBlockProcessor:
- def __init__(self) -> None:
- self.y_tolerance = 2
- self.pdf_dic = {}
- def __span_flags_decomposer(self, span_flags):
- """
- Make font flags human readable.
- Parameters
- ----------
- self : object
- The instance of the class.
- span_flags : int
- span flags
- Returns
- -------
- l : dict
- decomposed flags
- """
- l = {
- "is_superscript": False,
- "is_italic": False,
- "is_serifed": False,
- "is_sans_serifed": False,
- "is_monospaced": False,
- "is_proportional": False,
- "is_bold": False,
- }
- if span_flags & 2**0:
- l["is_superscript"] = True # 表示上标
- if span_flags & 2**1:
- l["is_italic"] = True # 表示斜体
- if span_flags & 2**2:
- l["is_serifed"] = True # 表示衬线字体
- else:
- l["is_sans_serifed"] = True # 表示非衬线字体
- if span_flags & 2**3:
- l["is_monospaced"] = True # 表示等宽字体
- else:
- l["is_proportional"] = True # 表示比例字体
- if span_flags & 2**4:
- l["is_bold"] = True # 表示粗体
- return l
- def __make_new_lines(self, raw_lines):
- """
- This function makes new lines.
- Parameters
- ----------
- self : object
- The instance of the class.
- raw_lines : list
- raw lines
- Returns
- -------
- new_lines : list
- new lines
- """
- new_lines = []
- new_line = None
- for raw_line in raw_lines:
- raw_line_bbox = raw_line["bbox"]
- raw_line_spans = raw_line["spans"]
- raw_line_text = "".join([span["text"] for span in raw_line_spans])
- raw_line_dir = raw_line.get("dir", None)
- decomposed_line_spans = []
- for span in raw_line_spans:
- raw_flags = span["flags"]
- decomposed_flags = self.__span_flags_decomposer(raw_flags)
- span["decomposed_flags"] = decomposed_flags
- decomposed_line_spans.append(span)
- if new_line is None: # Handle the first line
- new_line = {
- "bbox": raw_line_bbox,
- "text": raw_line_text,
- "dir": raw_line_dir if raw_line_dir else (0, 0),
- "spans": decomposed_line_spans,
- }
- else: # Handle the rest lines
- if (
- abs(raw_line_bbox[1] - new_line["bbox"][1]) <= self.y_tolerance
- and abs(raw_line_bbox[3] - new_line["bbox"][3]) <= self.y_tolerance
- ):
- new_line["bbox"] = (
- min(new_line["bbox"][0], raw_line_bbox[0]), # left
- new_line["bbox"][1], # top
- max(new_line["bbox"][2], raw_line_bbox[2]), # right
- raw_line_bbox[3], # bottom
- )
- new_line["text"] += raw_line_text
- new_line["spans"].extend(raw_line_spans)
- new_line["dir"] = (
- new_line["dir"][0] + raw_line_dir[0],
- new_line["dir"][1] + raw_line_dir[1],
- )
- else:
- new_lines.append(new_line)
- new_line = {
- "bbox": raw_line_bbox,
- "text": raw_line_text,
- "dir": raw_line_dir if raw_line_dir else (0, 0),
- "spans": raw_line_spans,
- }
- if new_line:
- new_lines.append(new_line)
- return new_lines
- def __make_new_block(self, raw_block):
- """
- This function makes a new block.
- Parameters
- ----------
- self : object
- The instance of the class.
- ----------
- raw_block : dict
- a raw block
- Returns
- -------
- new_block : dict
- """
- new_block = {}
- block_id = raw_block["number"]
- block_bbox = raw_block["bbox"]
- block_text = "".join(span["text"] for line in raw_block["lines"] for span in line["spans"])
- raw_lines = raw_block["lines"]
- block_lines = self.__make_new_lines(raw_lines)
- new_block["block_id"] = block_id
- new_block["bbox"] = block_bbox
- new_block["text"] = block_text
- new_block["lines"] = block_lines
- return new_block
- def batch_process_blocks(self, pdf_dic):
- """
- This function processes the blocks in batch.
- Parameters
- ----------
- self : object
- The instance of the class.
- ----------
- blocks : list
- Input block is a list of raw blocks.
- Returns
- -------
- result_dict : dict
- result dictionary
- """
- for page_id, blocks in pdf_dic.items():
- if page_id.startswith("page_"):
- para_blocks = []
- if "preproc_blocks" in blocks.keys():
- input_blocks = blocks["preproc_blocks"]
- for raw_block in input_blocks:
- new_block = self.__make_new_block(raw_block)
- para_blocks.append(new_block)
- blocks["para_blocks"] = para_blocks
- return pdf_dic
- class BlockStatisticsCalculator:
- """
- This class calculates the statistics of the block.
- """
- def __init__(self) -> None:
- pass
- def __calc_stats_of_new_lines(self, new_lines):
- """
- This function calculates the paragraph metrics
- Parameters
- ----------
- combined_lines : list
- combined lines
- Returns
- -------
- X0 : float
- Median of x0 values, which represents the left average boundary of the block
- X1 : float
- Median of x1 values, which represents the right average boundary of the block
- avg_char_width : float
- Average of char widths, which represents the average char width of the block
- avg_char_height : float
- Average of line heights, which represents the average line height of the block
- """
- x0_values = []
- x1_values = []
- char_widths = []
- char_heights = []
- block_font_types = []
- block_font_sizes = []
- block_directions = []
- if len(new_lines) > 0:
- for i, line in enumerate(new_lines):
- line_bbox = line["bbox"]
- line_text = line["text"]
- line_spans = line["spans"]
- num_chars = len([ch for ch in line_text if not ch.isspace()])
- x0_values.append(line_bbox[0])
- x1_values.append(line_bbox[2])
- if num_chars > 0:
- char_width = (line_bbox[2] - line_bbox[0]) / num_chars
- char_widths.append(char_width)
- for span in line_spans:
- block_font_types.append(span["font"])
- block_font_sizes.append(span["size"])
- if "dir" in line:
- block_directions.append(line["dir"])
- # line_font_types = [span["font"] for span in line_spans]
- char_heights = [span["size"] for span in line_spans]
- X0 = np.median(x0_values) if x0_values else 0
- X1 = np.median(x1_values) if x1_values else 0
- avg_char_width = sum(char_widths) / len(char_widths) if char_widths else 0
- avg_char_height = sum(char_heights) / len(char_heights) if char_heights else 0
- # max_freq_font_type = max(set(block_font_types), key=block_font_types.count) if block_font_types else None
- max_span_length = 0
- max_span_font_type = None
- for line in new_lines:
- line_spans = line["spans"]
- for span in line_spans:
- span_length = span["bbox"][2] - span["bbox"][0]
- if span_length > max_span_length:
- max_span_length = span_length
- max_span_font_type = span["font"]
- max_freq_font_type = max_span_font_type
- avg_font_size = sum(block_font_sizes) / len(block_font_sizes) if block_font_sizes else None
- avg_dir_horizontal = sum([dir[0] for dir in block_directions]) / len(block_directions) if block_directions else 0
- avg_dir_vertical = sum([dir[1] for dir in block_directions]) / len(block_directions) if block_directions else 0
- median_font_size = float(np.median(block_font_sizes)) if block_font_sizes else None
- return (
- X0,
- X1,
- avg_char_width,
- avg_char_height,
- max_freq_font_type,
- avg_font_size,
- (avg_dir_horizontal, avg_dir_vertical),
- median_font_size,
- )
- def __make_new_block(self, input_block):
- new_block = {}
- raw_lines = input_block["lines"]
- stats = self.__calc_stats_of_new_lines(raw_lines)
- block_id = input_block["block_id"]
- block_bbox = input_block["bbox"]
- block_text = input_block["text"]
- block_lines = raw_lines
- block_avg_left_boundary = stats[0]
- block_avg_right_boundary = stats[1]
- block_avg_char_width = stats[2]
- block_avg_char_height = stats[3]
- block_font_type = stats[4]
- block_font_size = stats[5]
- block_direction = stats[6]
- block_median_font_size = stats[7]
- new_block["block_id"] = block_id
- new_block["bbox"] = block_bbox
- new_block["text"] = block_text
- new_block["dir"] = block_direction
- new_block["X0"] = block_avg_left_boundary
- new_block["X1"] = block_avg_right_boundary
- new_block["avg_char_width"] = block_avg_char_width
- new_block["avg_char_height"] = block_avg_char_height
- new_block["block_font_type"] = block_font_type
- new_block["block_font_size"] = block_font_size
- new_block["lines"] = block_lines
- new_block["median_font_size"] = block_median_font_size
- return new_block
- def batch_process_blocks(self, pdf_dic):
- """
- This function processes the blocks in batch.
- Parameters
- ----------
- self : object
- The instance of the class.
- ----------
- blocks : list
- Input block is a list of raw blocks.
- Schema can refer to the value of key ""preproc_blocks".
- Returns
- -------
- result_dict : dict
- result dictionary
- """
- for page_id, blocks in pdf_dic.items():
- if page_id.startswith("page_"):
- para_blocks = []
- if "para_blocks" in blocks.keys():
- input_blocks = blocks["para_blocks"]
- for input_block in input_blocks:
- new_block = self.__make_new_block(input_block)
- para_blocks.append(new_block)
- blocks["para_blocks"] = para_blocks
- return pdf_dic
- class DocStatisticsCalculator:
- """
- This class calculates the statistics of the document.
- """
- def __init__(self) -> None:
- pass
- def calc_stats_of_doc(self, pdf_dict):
- """
- This function computes the statistics of the document
- Parameters
- ----------
- result_dict : dict
- result dictionary
- Returns
- -------
- statistics : dict
- statistics of the document
- """
- total_text_length = 0
- total_num_blocks = 0
- for page_id, blocks in pdf_dict.items():
- if page_id.startswith("page_"):
- if "para_blocks" in blocks.keys():
- para_blocks = blocks["para_blocks"]
- for para_block in para_blocks:
- total_text_length += len(para_block["text"])
- total_num_blocks += 1
- avg_text_length = total_text_length / total_num_blocks if total_num_blocks else 0
- font_list = []
- for page_id, blocks in pdf_dict.items():
- if page_id.startswith("page_"):
- if "para_blocks" in blocks.keys():
- input_blocks = blocks["para_blocks"]
- for input_block in input_blocks:
- block_text_length = len(input_block.get("text", ""))
- if block_text_length < avg_text_length * 0.5:
- continue
- block_font_type = safe_get(input_block, "block_font_type", "")
- block_font_size = safe_get(input_block, "block_font_size", 0)
- font_list.append((block_font_type, block_font_size))
- font_counter = Counter(font_list)
- most_common_font = font_counter.most_common(1)[0] if font_list else (("", 0), 0)
- second_most_common_font = font_counter.most_common(2)[1] if len(font_counter) > 1 else (("", 0), 0)
- statistics = {
- "num_pages": 0,
- "num_blocks": 0,
- "num_paras": 0,
- "num_titles": 0,
- "num_header_blocks": 0,
- "num_footer_blocks": 0,
- "num_watermark_blocks": 0,
- "num_vertical_margin_note_blocks": 0,
- "most_common_font_type": most_common_font[0][0],
- "most_common_font_size": most_common_font[0][1],
- "number_of_most_common_font": most_common_font[1],
- "second_most_common_font_type": second_most_common_font[0][0],
- "second_most_common_font_size": second_most_common_font[0][1],
- "number_of_second_most_common_font": second_most_common_font[1],
- "avg_text_length": avg_text_length,
- }
- for page_id, blocks in pdf_dict.items():
- if page_id.startswith("page_"):
- blocks = pdf_dict[page_id]["para_blocks"]
- statistics["num_pages"] += 1
- for block_id, block_data in enumerate(blocks):
- statistics["num_blocks"] += 1
- if "paras" in block_data.keys():
- statistics["num_paras"] += len(block_data["paras"])
- for line in block_data["lines"]:
- if line.get("is_title", 0):
- statistics["num_titles"] += 1
- if block_data.get("is_header", 0):
- statistics["num_header_blocks"] += 1
- if block_data.get("is_footer", 0):
- statistics["num_footer_blocks"] += 1
- if block_data.get("is_watermark", 0):
- statistics["num_watermark_blocks"] += 1
- if block_data.get("is_vertical_margin_note", 0):
- statistics["num_vertical_margin_note_blocks"] += 1
- pdf_dict["statistics"] = statistics
- return pdf_dict
- class TitleProcessor:
- """
- This class processes the title.
- """
- def __init__(self, *doc_statistics) -> None:
- if len(doc_statistics) > 0:
- self.doc_statistics = doc_statistics[0]
- self.nlp_model = NLPModels()
- self.MAX_TITLE_LEVEL = 3
- self.numbered_title_pattern = r"""
- ^ # 行首
- ( # 开始捕获组
- [\(\(]\d+[\)\)] # 括号内数字,支持中文和英文括号,例如:(1) 或 (1)
- |\d+[\)\)]\s # 数字后跟右括号和空格,支持中文和英文括号,例如:2) 或 2)
- |[\(\(][A-Z][\)\)] # 括号内大写字母,支持中文和英文括号,例如:(A) 或 (A)
- |[A-Z][\)\)]\s # 大写字母后跟右括号和空格,例如:A) 或 A)
- |[\(\(][IVXLCDM]+[\)\)] # 括号内罗马数字,支持中文和英文括号,例如:(I) 或 (I)
- |[IVXLCDM]+[\)\)]\s # 罗马数字后跟右括号和空格,例如:I) 或 I)
- |\d+(\.\d+)*\s # 数字或复合数字编号后跟空格,例如:1. 或 3.2.1
- |[一二三四五六七八九十百千]+[、\s] # 中文序号后跟顿号和空格,例如:一、
- |[\(|\(][一二三四五六七八九十百千]+[\)|\)]\s* # 中文括号内中文序号后跟空格,例如:(一)
- |[A-Z]\.\d+(\.\d+)?\s # 大写字母后跟点和数字,例如:A.1 或 A.1.1
- |[\(\(][a-z][\)\)] # 括号内小写字母,支持中文和英文括号,例如:(a) 或 (a)
- |[a-z]\)\s # 小写字母后跟右括号和空格,例如:a)
- |[A-Z]-\s # 大写字母后跟短横线和空格,例如:A-
- |\w+:\s # 英文序号词后跟冒号和空格,例如:First:
- |第[一二三四五六七八九十百千]+[章节部分条款]\s # 以“第”开头的中文标题后跟空格
- |[IVXLCDM]+\. # 罗马数字后跟点,例如:I.
- |\d+\.\s # 单个数字后跟点和空格,例如:1.
- ) # 结束捕获组
- .+ # 标题的其余部分
- """
- self.stage = (
- 0 # Used for distinguishing the stage of title detection, the number is occurred in paragraph process pipeline
- )
- def _is_potential_title(
- self,
- curr_line,
- prev_line,
- prev_line_is_title,
- next_line,
- avg_char_width,
- avg_char_height,
- median_font_size,
- ):
- """
- This function checks if the line is a potential title.
- Parameters
- ----------
- curr_line : dict
- current line
- prev_line : dict
- previous line
- next_line : dict
- next line
- avg_char_width : float
- average of char widths
- avg_char_height : float
- average of line heights
- Returns
- -------
- bool
- True if the line is a potential title, False otherwise.
- """
- def __is_line_centered(line_bbox, page_bbox, avg_char_width):
- """
- This function checks if the line is centered on the page
- Parameters
- ----------
- line_bbox : list
- bbox of the line
- page_bbox : list
- bbox of the page
- avg_char_width : float
- average of char widths
- Returns
- -------
- bool
- True if the line is centered on the page, False otherwise.
- """
- horizontal_ratio = 0.5
- horizontal_thres = horizontal_ratio * avg_char_width
- x0, _, x1, _ = line_bbox
- _, _, page_x1, _ = page_bbox
- return abs((x0 + x1) / 2 - page_x1 / 2) < horizontal_thres
- def __is_bold_font_line(line):
- """
- Check if a line contains any bold font style.
- """
- def _is_bold_span(span):
- # if span text is empty or only contains space, return False
- if not span["text"].strip():
- return False
- return bool(span["flags"] & 2**4) # Check if the font is bold
- for span in line["spans"]:
- if not _is_bold_span(span):
- return False
- return True
- def __is_italic_font_line(line):
- """
- Check if a line contains any italic font style.
- """
- def __is_italic_span(span):
- return bool(span["flags"] & 2**1) # Check if the font is italic
- for span in line["spans"]:
- if not __is_italic_span(span):
- return False
- return True
- def __is_punctuation_heavy(line_text):
- """
- Check if the line contains a high ratio of punctuation marks, which may indicate
- that the line is not a title.
- Parameters:
- line_text (str): Text of the line.
- Returns:
- bool: True if the line is heavy with punctuation, False otherwise.
- """
- # Pattern for common title format like "X.Y. Title"
- pattern = r"\b\d+\.\d+\..*\b"
- # If the line matches the title format, return False
- if re.match(pattern, line_text.strip()):
- return False
- # Find all punctuation marks in the line
- punctuation_marks = re.findall(r"[^\w\s]", line_text)
- number_of_punctuation_marks = len(punctuation_marks)
- text_length = len(line_text)
- if text_length == 0:
- return False
- punctuation_ratio = number_of_punctuation_marks / text_length
- if punctuation_ratio >= 0.1:
- return True
- return False
- def __has_mixed_font_styles(spans, strict_mode=False):
- """
- This function checks if the line has mixed font styles, the strict mode will compare the font types
- Parameters
- ----------
- spans : list
- spans of the line
- strict_mode : bool
- True for strict mode, the font types will be fully compared
- False for non-strict mode, the font types will be compared by the most longest common prefix
- Returns
- -------
- bool
- True if the line has mixed font styles, False otherwise.
- """
- if strict_mode:
- font_styles = set()
- for span in spans:
- font_style = span["font"].lower()
- font_styles.add(font_style)
- return len(font_styles) > 1
- else: # non-strict mode
- font_styles = []
- for span in spans:
- font_style = span["font"].lower()
- font_styles.append(font_style)
- if len(font_styles) > 1:
- longest_common_prefix = os.path.commonprefix(font_styles)
- if len(longest_common_prefix) > 0:
- return False
- else:
- return True
- else:
- return False
- def __is_different_font_type_from_neighbors(curr_line_font_type, prev_line_font_type, next_line_font_type):
- """
- This function checks if the current line has a different font type from the previous and next lines
- Parameters
- ----------
- curr_line_font_type : str
- font type of the current line
- prev_line_font_type : str
- font type of the previous line
- next_line_font_type : str
- font type of the next line
- Returns
- -------
- bool
- True if the current line has a different font type from the previous and next lines, False otherwise.
- """
- return all(
- curr_line_font_type != other_font_type.lower()
- for other_font_type in [prev_line_font_type, next_line_font_type]
- if other_font_type is not None
- )
- def __is_larger_font_size_from_neighbors(curr_line_font_size, prev_line_font_size, next_line_font_size):
- """
- This function checks if the current line has a larger font size than the previous and next lines
- Parameters
- ----------
- curr_line_font_size : float
- font size of the current line
- prev_line_font_size : float
- font size of the previous line
- next_line_font_size : float
- font size of the next line
- Returns
- -------
- bool
- True if the current line has a larger font size than the previous and next lines, False otherwise.
- """
- return all(
- curr_line_font_size > other_font_size * 1.2
- for other_font_size in [prev_line_font_size, next_line_font_size]
- if other_font_size is not None
- )
- def __is_similar_to_pre_line(curr_line_font_type, prev_line_font_type, curr_line_font_size, prev_line_font_size):
- """
- This function checks if the current line is similar to the previous line
- Parameters
- ----------
- curr_line : dict
- current line
- prev_line : dict
- previous line
- Returns
- -------
- bool
- True if the current line is similar to the previous line, False otherwise.
- """
- if curr_line_font_type == prev_line_font_type and curr_line_font_size == prev_line_font_size:
- return True
- else:
- return False
- def __is_same_font_type_of_docAvg(curr_line_font_type):
- """
- This function checks if the current line has the same font type as the document average font type
- Parameters
- ----------
- curr_line_font_type : str
- font type of the current line
- Returns
- -------
- bool
- True if the current line has the same font type as the document average font type, False otherwise.
- """
- doc_most_common_font_type = safe_get(self.doc_statistics, "most_common_font_type", "").lower()
- doc_second_most_common_font_type = safe_get(self.doc_statistics, "second_most_common_font_type", "").lower()
- return curr_line_font_type.lower() in [doc_most_common_font_type, doc_second_most_common_font_type]
- def __is_font_size_not_less_than_docAvg(curr_line_font_size, ratio: float = 1):
- """
- This function checks if the current line has a large enough font size
- Parameters
- ----------
- curr_line_font_size : float
- font size of the current line
- ratio : float
- ratio of the current line font size to the document average font size
- Returns
- -------
- bool
- True if the current line has a large enough font size, False otherwise.
- """
- doc_most_common_font_size = safe_get(self.doc_statistics, "most_common_font_size", 0)
- doc_second_most_common_font_size = safe_get(self.doc_statistics, "second_most_common_font_size", 0)
- doc_avg_font_size = min(doc_most_common_font_size, doc_second_most_common_font_size)
- return curr_line_font_size >= doc_avg_font_size * ratio
- def __is_sufficient_spacing_above_and_below(
- curr_line_bbox,
- prev_line_bbox,
- next_line_bbox,
- avg_char_height,
- median_font_size,
- ):
- """
- This function checks if the current line has sufficient spacing above and below
- Parameters
- ----------
- curr_line_bbox : list
- bbox of the current line
- prev_line_bbox : list
- bbox of the previous line
- next_line_bbox : list
- bbox of the next line
- avg_char_width : float
- average of char widths
- avg_char_height : float
- average of line heights
- Returns
- -------
- bool
- True if the current line has sufficient spacing above and below, False otherwise.
- """
- vertical_ratio = 1.25
- vertical_thres = vertical_ratio * median_font_size
- _, y0, _, y1 = curr_line_bbox
- sufficient_spacing_above = False
- if prev_line_bbox:
- vertical_spacing_above = min(y0 - prev_line_bbox[1], y1 - prev_line_bbox[3])
- sufficient_spacing_above = vertical_spacing_above > vertical_thres
- else:
- sufficient_spacing_above = True
- sufficient_spacing_below = False
- if next_line_bbox:
- vertical_spacing_below = min(next_line_bbox[1] - y0, next_line_bbox[3] - y1)
- sufficient_spacing_below = vertical_spacing_below > vertical_thres
- else:
- sufficient_spacing_below = True
- return (sufficient_spacing_above, sufficient_spacing_below)
- def __is_word_list_line_by_rules(curr_line_text):
- """
- This function checks if the current line is a word list
- Parameters
- ----------
- curr_line_text : str
- text of the current line
- Returns
- -------
- bool
- True if the current line is a name list, False otherwise.
- """
- # name_list_pattern = r"([a-zA-Z][a-zA-Z\s]{0,20}[a-zA-Z]|[\u4e00-\u9fa5·]{2,16})(?=[,,;;\s]|$)"
- name_list_pattern = r"(?<![\u4e00-\u9fa5])([A-Z][a-z]{0,19}\s[A-Z][a-z]{0,19}|[\u4e00-\u9fa5]{2,6})(?=[,,;;\s]|$)"
- compiled_pattern = re.compile(name_list_pattern)
- if compiled_pattern.search(curr_line_text):
- return True
- else:
- return False
- def __get_text_catgr_by_nlp(curr_line_text):
- """
- This function checks if the current line is a name list using nlp model, such as spacy
- Parameters
- ----------
- curr_line_text : str
- text of the current line
- Returns
- -------
- bool
- True if the current line is a name list, False otherwise.
- """
- result = self.nlp_model.detect_entity_catgr_using_nlp(curr_line_text)
- return result
- def __is_numbered_title(curr_line_text):
- """
- This function checks if the current line is a numbered list
- Parameters
- ----------
- curr_line_text : str
- text of the current line
- Returns
- -------
- bool
- True if the current line is a numbered list, False otherwise.
- """
- compiled_pattern = re.compile(self.numbered_title_pattern, re.VERBOSE)
- if compiled_pattern.search(curr_line_text):
- return True
- else:
- return False
- def __is_end_with_ending_puncs(line_text):
- """
- This function checks if the current line ends with a ending punctuation mark
- Parameters
- ----------
- line_text : str
- text of the current line
- Returns
- -------
- bool
- True if the current line ends with a punctuation mark, False otherwise.
- """
- end_puncs = [".", "?", "!", "。", "?", "!", "…"]
- line_text = line_text.rstrip()
- if line_text[-1] in end_puncs:
- return True
- return False
- def __contains_only_no_meaning_symbols(line_text):
- """
- This function checks if the current line contains only symbols that have no meaning, if so, it is not a title.
- Situation contains:
- 1. Only have punctuation marks
- 2. Only have other non-meaning symbols
- Parameters
- ----------
- line_text : str
- text of the current line
- Returns
- -------
- bool
- True if the current line contains only symbols that have no meaning, False otherwise.
- """
- punctuation_marks = re.findall(r"[^\w\s]", line_text) # find all punctuation marks
- number_of_punctuation_marks = len(punctuation_marks)
- text_length = len(line_text)
- if text_length == 0:
- return False
- punctuation_ratio = number_of_punctuation_marks / text_length
- if punctuation_ratio >= 0.9:
- return True
- return False
- def __is_equation(line_text):
- """
- This function checks if the current line is an equation.
- Parameters
- ----------
- line_text : str
- Returns
- -------
- bool
- True if the current line is an equation, False otherwise.
- """
- equation_reg = r"\$.*?\\overline.*?\$" # to match interline equations
- if re.search(equation_reg, line_text):
- return True
- else:
- return False
- def __is_title_by_len(text, max_length=200):
- """
- This function checks if the current line is a title by length.
- Parameters
- ----------
- text : str
- text of the current line
- max_length : int
- max length of the title
- Returns
- -------
- bool
- True if the current line is a title, False otherwise.
- """
- text = text.strip()
- return len(text) <= max_length
- def __compute_line_font_type_and_size(curr_line):
- """
- This function computes the font type and font size of the line.
- Parameters
- ----------
- line : dict
- line
- Returns
- -------
- font_type : str
- font type of the line
- font_size : float
- font size of the line
- """
- spans = curr_line["spans"]
- max_accumulated_length = 0
- max_span_font_size = curr_line["spans"][0]["size"] # default value, float type
- max_span_font_type = curr_line["spans"][0]["font"].lower() # default value, string type
- for span in spans:
- if span["text"].isspace():
- continue
- span_length = span["bbox"][2] - span["bbox"][0]
- if span_length > max_accumulated_length:
- max_accumulated_length = span_length
- max_span_font_size = span["size"]
- max_span_font_type = span["font"].lower()
- return max_span_font_type, max_span_font_size
- def __is_a_consistent_sub_title(pre_line, curr_line):
- """
- This function checks if the current line is a consistent sub title.
- Parameters
- ----------
- pre_line : dict
- previous line
- curr_line : dict
- current line
- Returns
- -------
- bool
- True if the current line is a consistent sub title, False otherwise.
- """
- if pre_line is None:
- return False
- start_letter_of_pre_line = pre_line["text"][0]
- start_letter_of_curr_line = curr_line["text"][0]
- has_same_prefix_digit = (
- start_letter_of_pre_line.isdigit()
- and start_letter_of_curr_line.isdigit()
- and start_letter_of_pre_line == start_letter_of_curr_line
- )
- # prefix text of curr_line satisfies the following title format: x.x
- prefix_text_pattern = r"^\d+\.\d+"
- subtitle_format_match = re.match(prefix_text_pattern, curr_line["text"])
- if subtitle_format_match:
- has_subtitle_format = True
- else:
- has_subtitle_format = False
- if has_same_prefix_digit or has_subtitle_format:
- print("is a consistent sub title")
- return True
- """
- Title detecting main Process.
- """
- """
- Basic features about the current line.
- """
- curr_line_bbox = curr_line["bbox"]
- curr_line_text = curr_line["text"]
- curr_line_font_type, curr_line_font_size = __compute_line_font_type_and_size(curr_line)
- if len(curr_line_text.strip()) == 0: # skip empty lines
- return False, False
- prev_line_bbox = prev_line["bbox"] if prev_line else None
- if prev_line:
- prev_line_font_type, prev_line_font_size = __compute_line_font_type_and_size(prev_line)
- else:
- prev_line_font_type, prev_line_font_size = None, None
- next_line_bbox = next_line["bbox"] if next_line else None
- if next_line:
- next_line_font_type, next_line_font_size = __compute_line_font_type_and_size(next_line)
- else:
- next_line_font_type, next_line_font_size = None, None
- """
- Aggregated features about the current line.
- """
- is_italc_font = __is_italic_font_line(curr_line)
- is_bold_font = __is_bold_font_line(curr_line)
- is_font_size_little_less_than_doc_avg = __is_font_size_not_less_than_docAvg(curr_line_font_size, ratio=0.8)
- is_font_size_not_less_than_doc_avg = __is_font_size_not_less_than_docAvg(curr_line_font_size, ratio=1)
- is_much_larger_font_than_doc_avg = __is_font_size_not_less_than_docAvg(curr_line_font_size, ratio=1.6)
- is_not_same_font_type_of_docAvg = not __is_same_font_type_of_docAvg(curr_line_font_type)
- is_potential_title_font = is_bold_font or is_font_size_not_less_than_doc_avg or is_not_same_font_type_of_docAvg
- is_mix_font_styles_strict = __has_mixed_font_styles(curr_line["spans"], strict_mode=True)
- is_mix_font_styles_loose = __has_mixed_font_styles(curr_line["spans"], strict_mode=False)
- is_punctuation_heavy = __is_punctuation_heavy(curr_line_text)
- is_word_list_line_by_rules = __is_word_list_line_by_rules(curr_line_text)
- is_person_or_org_list_line_by_nlp = __get_text_catgr_by_nlp(curr_line_text) in ["PERSON", "GPE", "ORG"]
- is_font_size_larger_than_neighbors = __is_larger_font_size_from_neighbors(
- curr_line_font_size, prev_line_font_size, next_line_font_size
- )
- is_font_type_diff_from_neighbors = __is_different_font_type_from_neighbors(
- curr_line_font_type, prev_line_font_type, next_line_font_type
- )
- has_sufficient_spaces_above, has_sufficient_spaces_below = __is_sufficient_spacing_above_and_below(
- curr_line_bbox, prev_line_bbox, next_line_bbox, avg_char_height, median_font_size
- )
- is_similar_to_pre_line = __is_similar_to_pre_line(
- curr_line_font_type, prev_line_font_type, curr_line_font_size, prev_line_font_size
- )
- is_consistent_sub_title = __is_a_consistent_sub_title(prev_line, curr_line)
- """
- Further aggregated features about the current line.
-
- Attention:
- Features that start with __ are for internal use.
- """
- __is_line_left_aligned_from_neighbors = is_line_left_aligned_from_neighbors(
- curr_line_bbox, prev_line_bbox, next_line_bbox, avg_char_width
- )
- __is_font_diff_from_neighbors = is_font_size_larger_than_neighbors or is_font_type_diff_from_neighbors
- is_a_left_inline_title = (
- is_mix_font_styles_strict and __is_line_left_aligned_from_neighbors and __is_font_diff_from_neighbors
- )
- is_title_by_check_prev_line = prev_line is None and has_sufficient_spaces_above and is_potential_title_font
- is_title_by_check_next_line = next_line is None and has_sufficient_spaces_below and is_potential_title_font
- is_title_by_check_pre_and_next_line = (
- (prev_line is not None or next_line is not None)
- and has_sufficient_spaces_above
- and has_sufficient_spaces_below
- and is_potential_title_font
- )
- is_numbered_title = __is_numbered_title(curr_line_text) and (
- (has_sufficient_spaces_above or prev_line is None) and (has_sufficient_spaces_below or next_line is None)
- )
- is_not_end_with_ending_puncs = not __is_end_with_ending_puncs(curr_line_text)
- is_not_only_no_meaning_symbols = not __contains_only_no_meaning_symbols(curr_line_text)
- is_equation = __is_equation(curr_line_text)
- is_title_by_len = __is_title_by_len(curr_line_text)
- """
- Decide if the line is a title.
- """
- is_title = (
- is_not_end_with_ending_puncs # not end with ending punctuation marks
- and is_not_only_no_meaning_symbols # not only have no meaning symbols
- and is_title_by_len # is a title by length, default max length is 200
- and not is_equation # an interline equation should never be a title
- and is_potential_title_font # is a potential title font, which is bold or larger than the document average font size or not the same font type as the document average font type
- and (
- (is_not_same_font_type_of_docAvg and is_font_size_not_less_than_doc_avg)
- or (is_bold_font and is_much_larger_font_than_doc_avg and is_not_same_font_type_of_docAvg)
- or (
- is_much_larger_font_than_doc_avg
- and (is_title_by_check_prev_line or is_title_by_check_next_line or is_title_by_check_pre_and_next_line)
- )
- or (
- is_font_size_little_less_than_doc_avg
- and is_bold_font
- and (is_title_by_check_prev_line or is_title_by_check_next_line or is_title_by_check_pre_and_next_line)
- )
- ) # not the same font type as the document average font type, which includes the most common font type and the second most common font type
- and (
- (
- not is_person_or_org_list_line_by_nlp
- and (
- is_much_larger_font_than_doc_avg
- or (is_not_same_font_type_of_docAvg and is_font_size_not_less_than_doc_avg)
- )
- )
- or (
- not (is_word_list_line_by_rules and is_person_or_org_list_line_by_nlp)
- and not is_a_left_inline_title
- and not is_punctuation_heavy
- and (is_title_by_check_prev_line or is_title_by_check_next_line or is_title_by_check_pre_and_next_line)
- )
- or (
- is_person_or_org_list_line_by_nlp
- and (is_bold_font and is_much_larger_font_than_doc_avg and is_not_same_font_type_of_docAvg)
- and (is_bold_font and is_much_larger_font_than_doc_avg and is_not_same_font_type_of_docAvg)
- )
- or (is_numbered_title and not is_a_left_inline_title)
- )
- # )
- ) or (prev_line_is_title and is_consistent_sub_title)
- is_name_or_org_list_to_be_removed = (
- (is_person_or_org_list_line_by_nlp)
- and is_punctuation_heavy
- and (is_title_by_check_prev_line or is_title_by_check_next_line or is_title_by_check_pre_and_next_line)
- ) and not is_title
- if is_name_or_org_list_to_be_removed:
- is_author_or_org_list = True
- else:
- is_author_or_org_list = False
- # return is_title, is_author_or_org_list
- # """
- """
- # print reason why the line is a title
- if is_title:
- print_green("This line is a title.")
- print_green("↓" * 10)
- print()
- print("curr_line_text: ", curr_line_text)
- print()
- print(f"prev_line_is_title: {prev_line_is_title}")
- print()
- print(f"is_consistent_sub_title: {is_consistent_sub_title}")
- """
- # print reason why the line is not a title
- # line_text = curr_line_text.strip()
- # test_text = "Career/Personal Life"
- # text_content_condition = line_text == test_text
- # if not is_title and text_content_condition: # Print specific line
- """
- if not is_title: # Print each line
- print_red("This line is not a title.")
- print_red("↓" * 10)
- print()
- print("curr_line_text: ", curr_line_text)
- print()
- if is_not_end_with_ending_puncs:
- print_green(f"is_not_end_with_ending_puncs")
- else:
- print_red(f"is_end_with_ending_puncs")
- if is_not_only_no_meaning_symbols:
- print_green(f"is_not_only_no_meaning_symbols")
- else:
- print_red(f"is_only_no_meaning_symbols")
- if is_title_by_len:
- print_green(f"is_title_by_len: {is_title_by_len}")
- else:
- print_red(f"is_not_title_by_len: {is_title_by_len}")
- if is_equation:
- print_red(f"is_equation")
- else:
- print_green(f"is_not_equation")
- if is_potential_title_font:
- print_green(f"is_potential_title_font")
- else:
- print_red(f"is_not_potential_title_font")
- if is_punctuation_heavy:
- print_red("is_punctuation_heavy")
- else:
- print_green("is_not_punctuation_heavy")
- if is_bold_font:
- print_green(f"is_bold_font")
- else:
- print_red(f"is_not_bold_font")
- if is_font_size_not_less_than_doc_avg:
- print_green(f"is_larger_font_than_doc_avg")
- else:
- print_red(f"is_not_larger_font_than_doc_avg")
- if is_much_larger_font_than_doc_avg:
- print_green(f"is_much_larger_font_than_doc_avg")
- else:
- print_red(f"is_not_much_larger_font_than_doc_avg")
- if is_not_same_font_type_of_docAvg:
- print_green(f"is_not_same_font_type_of_docAvg")
- else:
- print_red(f"is_same_font_type_of_docAvg")
- if is_word_list_line_by_rules:
- print_red("is_word_list_line_by_rules")
- else:
- print_green("is_not_name_list_by_rules")
- if is_person_or_org_list_line_by_nlp:
- print_red("is_person_or_org_list_line_by_nlp")
- else:
- print_green("is_not_person_or_org_list_line_by_nlp")
- if not is_numbered_title:
- print_red("is_not_numbered_title")
- else:
- print_green("is_numbered_title")
- if is_a_left_inline_title:
- print_red("is_a_left_inline_title")
- else:
- print_green("is_not_a_left_inline_title")
- if not is_title_by_check_prev_line:
- print_red("is_not_title_by_check_prev_line")
- else:
- print_green("is_title_by_check_prev_line")
- if not is_title_by_check_next_line:
- print_red("is_not_title_by_check_next_line")
- else:
- print_green("is_title_by_check_next_line")
- if not is_title_by_check_pre_and_next_line:
- print_red("is_not_title_by_check_pre_and_next_line")
- else:
- print_green("is_title_by_check_pre_and_next_line")
- # print_green("Common features:")
- # print_green("↓" * 10)
- # print(f" curr_line_font_type: {curr_line_font_type}")
- # print(f" curr_line_font_size: {curr_line_font_size}")
- # print()
- """
- # """
- return is_title, is_author_or_org_list
- def _detect_title(self, curr_block, pre_block):
- """
- Use the functions 'is_potential_title' to detect titles of each paragraph block.
- If a line is a title, then the value of key 'is_title' of the line will be set to True.
- """
- raw_lines = curr_block["lines"]
- blk_avg_char_width = curr_block["avg_char_width"]
- blk_avg_char_height = curr_block["avg_char_height"]
- blk_media_font_size = curr_block["median_font_size"]
- if self.stage == 0:
- is_prev_line_a_title = False
- for i, curr_line in enumerate(raw_lines):
- prev_line = raw_lines[i - 1] if i > 0 else None
- next_line = raw_lines[i + 1] if i < len(raw_lines) - 1 else None
- is_line_a_title, is_line_an_entities_list = self._is_potential_title(
- curr_line,
- prev_line,
- is_prev_line_a_title,
- next_line,
- blk_avg_char_width,
- blk_avg_char_height,
- blk_media_font_size,
- )
- if is_line_a_title:
- curr_line["is_title"] = is_line_a_title
- is_prev_line_a_title = True # set the flag to True for the next line
- else:
- curr_line["is_title"] = False
- is_prev_line_a_title = False # set the flag to False for the next line
- if is_line_an_entities_list:
- curr_line["is_author_or_org_list"] = is_line_an_entities_list
- else:
- curr_line["is_author_or_org_list"] = False
- return curr_block
- if self.stage == 1: # Check the block consistent titles.
- if pre_block and "paras" in pre_block.keys():
- print_red(f"Checking cross block title...")
- last_para_content = None
- paras_of_pre_block = pre_block["paras"]
- last_key = sorted(paras_of_pre_block.keys())[-1]
- last_para_content = paras_of_pre_block[last_key]
- if last_para_content is not None:
- last_line_of_last_para_of_last_block = pre_block["lines"][-1]
- first_line_of_curr_block = raw_lines[0]
- next_line_of_curr_block = raw_lines[1] if len(raw_lines) > 1 else None
- is_line_a_title, is_line_an_entities_list = self._is_potential_title(
- first_line_of_curr_block,
- last_line_of_last_para_of_last_block,
- last_line_of_last_para_of_last_block["is_title"],
- next_line_of_curr_block,
- blk_avg_char_width,
- blk_avg_char_height,
- blk_media_font_size,
- )
- if is_line_a_title:
- first_line_of_curr_block["is_title"] = is_line_a_title
- else:
- first_line_of_curr_block["is_title"] = False
- if is_line_an_entities_list:
- first_line_of_curr_block["is_author_or_org_list"] = is_line_an_entities_list
- else:
- first_line_of_curr_block["is_author_or_org_list"] = False
- # print(f"first_line_of_curr_block: {first_line_of_curr_block['text']}")
- # print(f"last_line_of_pre_block: {last_line_of_last_para['text']}")
- return curr_block
- else:
- print_red(f"last_para_content is None")
- def batch_detect_titles(self, pdf_dic):
- """
- This function batch process the blocks to detect titles.
- Parameters
- ----------
- pdf_dict : dict
- result dictionary
- Returns
- -------
- pdf_dict : dict
- result dictionary
- """
- num_titles = 0
- for page_id, page_content in pdf_dic.items():
- if page_id.startswith("page_"):
- para_blocks = []
- if "para_blocks" in page_content.keys():
- para_blocks = page_content["para_blocks"]
- all_single_line_blocks = []
- for block in para_blocks:
- if len(block["lines"]) == 1:
- all_single_line_blocks.append(block)
- new_para_blocks = []
- if not len(all_single_line_blocks) == len(para_blocks): # Not all blocks are single line blocks.
- for para_idx, para_block in enumerate(para_blocks):
- print(f"______________________________________________________")
- print(f"page_id: {page_id}")
- print(f"para_block id: {para_block['block_id']}")
- print(f"para_idx: {para_idx}")
- pre_block = para_blocks[para_idx - 1] if para_idx > 0 else None
- curr_block = para_block
- print_yellow(f"text of current block: {curr_block['text'] if curr_block else None}")
- print_green(f"text of previous block: {pre_block['text'] if pre_block else None}")
- new_block = self._detect_title(curr_block, pre_block)
- new_para_blocks.append(new_block)
- # num_titles += sum([line.get("is_title", 0) for line in new_block["lines"]])
- if new_block is not None:
- num_titles += sum([line.get("is_title", 0) for line in new_block["lines"]])
- else:
- num_titles += 0
- else: # All blocks are single line blocks.
- for para_block in para_blocks:
- new_para_blocks.append(para_block)
- num_titles += sum([line.get("is_title", 0) for line in para_block["lines"]])
- para_blocks = new_para_blocks
- page_content["para_blocks"] = para_blocks
- for para_block in para_blocks:
- if para_block is not None:
- all_titles = all(safe_get(line, "is_title", False) for line in para_block["lines"])
- para_text_len = sum([len(line["text"]) for line in para_block["lines"]])
- if (
- all_titles and para_text_len < 200
- ): # total length of the paragraph is less than 200, more than this should not be a title
- para_block["is_block_title"] = 1
- else:
- para_block["is_block_title"] = 0
- all_name_or_org_list_to_be_removed = all(
- safe_get(line, "is_author_or_org_list", False) for line in para_block["lines"]
- )
- if all_name_or_org_list_to_be_removed and page_id == "page_0":
- para_block["is_block_an_author_or_org_list"] = 1
- else:
- para_block["is_block_an_author_or_org_list"] = 0
- else:
- all_titles = False
- # para_block["is_block_title"] = 0
- # para_block["is_block_an_author_or_org_list"] = 0
- # page_content["para_blocks"] = para_blocks
- pdf_dic["statistics"]["num_titles"] = num_titles
- return pdf_dic
- def _recog_title_level(self, title_blocks):
- """
- This function determines the title level based on the font size of the title.
- Parameters
- ----------
- title_blocks : list
- Returns
- -------
- title_blocks : list
- """
- font_sizes = np.array([safe_get(tb["block"], "block_font_size", 0) for tb in title_blocks])
- # Use the mean and std of font sizes to remove extreme values
- mean_font_size = np.mean(font_sizes)
- std_font_size = np.std(font_sizes)
- min_extreme_font_size = mean_font_size - std_font_size # type: ignore
- max_extreme_font_size = mean_font_size + std_font_size # type: ignore
- # Compute the threshold for title level
- middle_font_sizes = font_sizes[(font_sizes > min_extreme_font_size) & (font_sizes < max_extreme_font_size)]
- if middle_font_sizes.size > 0:
- middle_mean_font_size = np.mean(middle_font_sizes)
- level_threshold = middle_mean_font_size
- else:
- level_threshold = mean_font_size
- for tb in title_blocks:
- title_block = tb["block"]
- title_font_size = safe_get(title_block, "block_font_size", 0)
- current_level = 1 # Initialize title level, the biggest level is 1
- # print(f"Before adjustment by font size, {current_level}")
- if title_font_size >= max_extreme_font_size:
- current_level = 1
- elif title_font_size <= min_extreme_font_size:
- current_level = 3
- elif float(title_font_size) >= float(level_threshold):
- current_level = 2
- else:
- current_level = 3
- # print(f"After adjustment by font size, {current_level}")
- title_block["block_title_level"] = current_level
- return title_blocks
- def batch_recog_title_level(self, pdf_dic):
- """
- This function batch process the blocks to recognize title level.
- Parameters
- ----------
- pdf_dict : dict
- result dictionary
- Returns
- -------
- pdf_dict : dict
- result dictionary
- """
- title_blocks = []
- # Collect all titles
- for page_id, blocks in pdf_dic.items():
- if page_id.startswith("page_"):
- para_blocks = blocks.get("para_blocks", [])
- for block in para_blocks:
- if block.get("is_block_title"):
- title_obj = {"page_id": page_id, "block": block}
- title_blocks.append(title_obj)
- # Determine title level
- if title_blocks:
- # Determine title level based on font size
- title_blocks = self._recog_title_level(title_blocks)
- return pdf_dic
- class BlockTerminationProcessor:
- """
- This class is used to process the block termination.
- """
- def __init__(self) -> None:
- pass
- def _is_consistent_lines(
- self,
- curr_line,
- prev_line,
- next_line,
- consistent_direction, # 0 for prev, 1 for next, 2 for both
- ):
- """
- This function checks if the line is consistent with its neighbors
- Parameters
- ----------
- curr_line : dict
- current line
- prev_line : dict
- previous line
- next_line : dict
- next line
- consistent_direction : int
- 0 for prev, 1 for next, 2 for both
- Returns
- -------
- bool
- True if the line is consistent with its neighbors, False otherwise.
- """
- curr_line_font_size = curr_line["spans"][0]["size"]
- curr_line_font_type = curr_line["spans"][0]["font"].lower()
- if consistent_direction == 0:
- if prev_line:
- prev_line_font_size = prev_line["spans"][0]["size"]
- prev_line_font_type = prev_line["spans"][0]["font"].lower()
- return curr_line_font_size == prev_line_font_size and curr_line_font_type == prev_line_font_type
- else:
- return False
- elif consistent_direction == 1:
- if next_line:
- next_line_font_size = next_line["spans"][0]["size"]
- next_line_font_type = next_line["spans"][0]["font"].lower()
- return curr_line_font_size == next_line_font_size and curr_line_font_type == next_line_font_type
- else:
- return False
- elif consistent_direction == 2:
- if prev_line and next_line:
- prev_line_font_size = prev_line["spans"][0]["size"]
- prev_line_font_type = prev_line["spans"][0]["font"].lower()
- next_line_font_size = next_line["spans"][0]["size"]
- next_line_font_type = next_line["spans"][0]["font"].lower()
- return (curr_line_font_size == prev_line_font_size and curr_line_font_type == prev_line_font_type) and (
- curr_line_font_size == next_line_font_size and curr_line_font_type == next_line_font_type
- )
- else:
- return False
- else:
- return False
- def _is_regular_line(self, curr_line_bbox, prev_line_bbox, next_line_bbox, avg_char_width, X0, X1, avg_line_height):
- """
- This function checks if the line is a regular line
- Parameters
- ----------
- curr_line_bbox : list
- bbox of the current line
- prev_line_bbox : list
- bbox of the previous line
- next_line_bbox : list
- bbox of the next line
- avg_char_width : float
- average of char widths
- X0 : float
- median of x0 values, which represents the left average boundary of the page
- X1 : float
- median of x1 values, which represents the right average boundary of the page
- avg_line_height : float
- average of line heights
- Returns
- -------
- bool
- True if the line is a regular line, False otherwise.
- """
- horizontal_ratio = 0.5
- vertical_ratio = 0.5
- horizontal_thres = horizontal_ratio * avg_char_width
- vertical_thres = vertical_ratio * avg_line_height
- x0, y0, x1, y1 = curr_line_bbox
- x0_near_X0 = abs(x0 - X0) < horizontal_thres
- x1_near_X1 = abs(x1 - X1) < horizontal_thres
- prev_line_is_end_of_para = prev_line_bbox and (abs(prev_line_bbox[2] - X1) > avg_char_width)
- sufficient_spacing_above = False
- if prev_line_bbox:
- vertical_spacing_above = y1 - prev_line_bbox[3]
- sufficient_spacing_above = vertical_spacing_above > vertical_thres
- sufficient_spacing_below = False
- if next_line_bbox:
- vertical_spacing_below = next_line_bbox[1] - y0
- sufficient_spacing_below = vertical_spacing_below > vertical_thres
- return (
- (sufficient_spacing_above or sufficient_spacing_below)
- or (not x0_near_X0 and not x1_near_X1)
- or prev_line_is_end_of_para
- )
- def _is_possible_start_of_para(self, curr_line, prev_line, next_line, X0, X1, avg_char_width, avg_font_size):
- """
- This function checks if the line is a possible start of a paragraph
- Parameters
- ----------
- curr_line : dict
- current line
- prev_line : dict
- previous line
- next_line : dict
- next line
- X0 : float
- median of x0 values, which represents the left average boundary of the page
- X1 : float
- median of x1 values, which represents the right average boundary of the page
- avg_char_width : float
- average of char widths
- avg_line_height : float
- average of line heights
- Returns
- -------
- bool
- True if the line is a possible start of a paragraph, False otherwise.
- """
- start_confidence = 0.5 # Initial confidence of the line being a start of a paragraph
- decision_path = [] # Record the decision path
- curr_line_bbox = curr_line["bbox"]
- prev_line_bbox = prev_line["bbox"] if prev_line else None
- next_line_bbox = next_line["bbox"] if next_line else None
- indent_ratio = 1
- vertical_ratio = 1.5
- vertical_thres = vertical_ratio * avg_font_size
- left_horizontal_ratio = 0.5
- left_horizontal_thres = left_horizontal_ratio * avg_char_width
- right_horizontal_ratio = 2.5
- right_horizontal_thres = right_horizontal_ratio * avg_char_width
- x0, y0, x1, y1 = curr_line_bbox
- indent_condition = x0 > X0 + indent_ratio * avg_char_width
- if indent_condition:
- start_confidence += 0.2
- decision_path.append("indent_condition_met")
- x0_near_X0 = abs(x0 - X0) < left_horizontal_thres
- if x0_near_X0:
- start_confidence += 0.1
- decision_path.append("x0_near_X0")
- x1_near_X1 = abs(x1 - X1) < right_horizontal_thres
- if x1_near_X1:
- start_confidence += 0.1
- decision_path.append("x1_near_X1")
- if prev_line is None:
- prev_line_is_end_of_para = True
- start_confidence += 0.2
- decision_path.append("no_prev_line")
- else:
- prev_line_is_end_of_para, _, _ = self._is_possible_end_of_para(prev_line, next_line, X0, X1, avg_char_width)
- if prev_line_is_end_of_para:
- start_confidence += 0.1
- decision_path.append("prev_line_is_end_of_para")
- sufficient_spacing_above = False
- if prev_line_bbox:
- vertical_spacing_above = y1 - prev_line_bbox[3]
- sufficient_spacing_above = vertical_spacing_above > vertical_thres
- if sufficient_spacing_above:
- start_confidence += 0.2
- decision_path.append("sufficient_spacing_above")
- sufficient_spacing_below = False
- if next_line_bbox:
- vertical_spacing_below = next_line_bbox[1] - y0
- sufficient_spacing_below = vertical_spacing_below > vertical_thres
- if sufficient_spacing_below:
- start_confidence += 0.2
- decision_path.append("sufficient_spacing_below")
- is_regular_line = self._is_regular_line(
- curr_line_bbox, prev_line_bbox, next_line_bbox, avg_char_width, X0, X1, avg_font_size
- )
- if is_regular_line:
- start_confidence += 0.1
- decision_path.append("is_regular_line")
- is_start_of_para = (
- (sufficient_spacing_above or sufficient_spacing_below)
- or (indent_condition)
- or (not indent_condition and x0_near_X0 and x1_near_X1 and not is_regular_line)
- or prev_line_is_end_of_para
- )
- return (is_start_of_para, start_confidence, decision_path)
- def _is_possible_end_of_para(self, curr_line, next_line, X0, X1, avg_char_width):
- """
- This function checks if the line is a possible end of a paragraph
- Parameters
- ----------
- curr_line : dict
- current line
- next_line : dict
- next line
- X0 : float
- median of x0 values, which represents the left average boundary of the page
- X1 : float
- median of x1 values, which represents the right average boundary of the page
- avg_char_width : float
- average of char widths
- Returns
- -------
- bool
- True if the line is a possible end of a paragraph, False otherwise.
- """
- end_confidence = 0.5 # Initial confidence of the line being a end of a paragraph
- decision_path = [] # Record the decision path
- curr_line_bbox = curr_line["bbox"]
- next_line_bbox = next_line["bbox"] if next_line else None
- left_horizontal_ratio = 0.5
- right_horizontal_ratio = 0.5
- x0, _, x1, y1 = curr_line_bbox
- next_x0, next_y0, _, _ = next_line_bbox if next_line_bbox else (0, 0, 0, 0)
- x0_near_X0 = abs(x0 - X0) < left_horizontal_ratio * avg_char_width
- if x0_near_X0:
- end_confidence += 0.1
- decision_path.append("x0_near_X0")
- x1_smaller_than_X1 = x1 < X1 - right_horizontal_ratio * avg_char_width
- if x1_smaller_than_X1:
- end_confidence += 0.1
- decision_path.append("x1_smaller_than_X1")
- next_line_is_start_of_para = (
- next_line_bbox
- and (next_x0 > X0 + left_horizontal_ratio * avg_char_width)
- and (not is_line_left_aligned_from_neighbors(curr_line_bbox, None, next_line_bbox, avg_char_width, direction=1))
- )
- if next_line_is_start_of_para:
- end_confidence += 0.2
- decision_path.append("next_line_is_start_of_para")
- is_line_left_aligned_from_neighbors_bool = is_line_left_aligned_from_neighbors(
- curr_line_bbox, None, next_line_bbox, avg_char_width
- )
- if is_line_left_aligned_from_neighbors_bool:
- end_confidence += 0.1
- decision_path.append("line_is_left_aligned_from_neighbors")
- is_line_right_aligned_from_neighbors_bool = is_line_right_aligned_from_neighbors(
- curr_line_bbox, None, next_line_bbox, avg_char_width
- )
- if not is_line_right_aligned_from_neighbors_bool:
- end_confidence += 0.1
- decision_path.append("line_is_not_right_aligned_from_neighbors")
- is_end_of_para = end_with_punctuation(curr_line["text"]) and (
- (x0_near_X0 and x1_smaller_than_X1)
- or (is_line_left_aligned_from_neighbors_bool and not is_line_right_aligned_from_neighbors_bool)
- )
- return (is_end_of_para, end_confidence, decision_path)
- def _cut_paras_per_block(
- self,
- block,
- ):
- """
- Processes a raw block from PyMuPDF and returns the processed block.
- Parameters
- ----------
- raw_block : dict
- A raw block from pymupdf.
- Returns
- -------
- processed_block : dict
- """
- def _construct_para(lines, is_block_title, para_title_level):
- """
- Construct a paragraph from given lines.
- """
- font_sizes = [span["size"] for line in lines for span in line["spans"]]
- avg_font_size = sum(font_sizes) / len(font_sizes) if font_sizes else 0
- font_colors = [span["color"] for line in lines for span in line["spans"]]
- most_common_font_color = max(set(font_colors), key=font_colors.count) if font_colors else None
- font_type_lengths = {}
- for line in lines:
- for span in line["spans"]:
- font_type = span["font"]
- bbox_width = span["bbox"][2] - span["bbox"][0]
- if font_type in font_type_lengths:
- font_type_lengths[font_type] += bbox_width
- else:
- font_type_lengths[font_type] = bbox_width
- # get the font type with the longest bbox width
- most_common_font_type = max(font_type_lengths, key=font_type_lengths.get) if font_type_lengths else None # type: ignore
- para_bbox = calculate_para_bbox(lines)
- para_text = " ".join(line["text"] for line in lines)
- return {
- "para_bbox": para_bbox,
- "para_text": para_text,
- "para_font_type": most_common_font_type,
- "para_font_size": avg_font_size,
- "para_font_color": most_common_font_color,
- "is_para_title": is_block_title,
- "para_title_level": para_title_level,
- }
- block_bbox = block["bbox"]
- block_text = block["text"]
- block_lines = block["lines"]
- X0 = safe_get(block, "X0", 0)
- X1 = safe_get(block, "X1", 0)
- avg_char_width = safe_get(block, "avg_char_width", 0)
- avg_char_height = safe_get(block, "avg_char_height", 0)
- avg_font_size = safe_get(block, "avg_font_size", 0)
- is_block_title = safe_get(block, "is_block_title", False)
- para_title_level = safe_get(block, "block_title_level", 0)
- # Segment into paragraphs
- para_ranges = []
- in_paragraph = False
- start_idx_of_para = None
- # Create the processed paragraphs
- processed_paras = {}
- para_bboxes = []
- end_idx_of_para = 0
- for line_index, line in enumerate(block_lines):
- curr_line = line
- prev_line = block_lines[line_index - 1] if line_index > 0 else None
- next_line = block_lines[line_index + 1] if line_index < len(block_lines) - 1 else None
- """
- Start processing paragraphs.
- """
- # Check if the line is the start of a paragraph
- is_start_of_para, start_confidence, decision_path = self._is_possible_start_of_para(
- curr_line, prev_line, next_line, X0, X1, avg_char_width, avg_font_size
- )
- if not in_paragraph and is_start_of_para:
- in_paragraph = True
- start_idx_of_para = line_index
- # print_green(">>> Start of a paragraph")
- # print(" curr_line_text: ", curr_line["text"])
- # print(" start_confidence: ", start_confidence)
- # print(" decision_path: ", decision_path)
- # Check if the line is the end of a paragraph
- is_end_of_para, end_confidence, decision_path = self._is_possible_end_of_para(
- curr_line, next_line, X0, X1, avg_char_width
- )
- if in_paragraph and (is_end_of_para or not next_line):
- para_ranges.append((start_idx_of_para, line_index))
- start_idx_of_para = None
- in_paragraph = False
- # print_red(">>> End of a paragraph")
- # print(" curr_line_text: ", curr_line["text"])
- # print(" end_confidence: ", end_confidence)
- # print(" decision_path: ", decision_path)
- # Add the last paragraph if it is not added
- if in_paragraph and start_idx_of_para is not None:
- para_ranges.append((start_idx_of_para, len(block_lines) - 1))
- # Process the matched paragraphs
- for para_index, (start_idx, end_idx) in enumerate(para_ranges):
- matched_lines = block_lines[start_idx : end_idx + 1]
- para_properties = _construct_para(matched_lines, is_block_title, para_title_level)
- para_key = f"para_{len(processed_paras)}"
- processed_paras[para_key] = para_properties
- para_bboxes.append(para_properties["para_bbox"])
- end_idx_of_para = end_idx + 1
- # Deal with the remaining lines
- if end_idx_of_para < len(block_lines):
- unmatched_lines = block_lines[end_idx_of_para:]
- unmatched_properties = _construct_para(unmatched_lines, is_block_title, para_title_level)
- unmatched_key = f"para_{len(processed_paras)}"
- processed_paras[unmatched_key] = unmatched_properties
- para_bboxes.append(unmatched_properties["para_bbox"])
- block["paras"] = processed_paras
- return block
- def batch_process_blocks(self, pdf_dict):
- """
- Parses the blocks of all pages.
- Parameters
- ----------
- pdf_dict : dict
- PDF dictionary.
- filter_blocks : list
- List of bounding boxes to filter.
- Returns
- -------
- result_dict : dict
- Result dictionary.
- """
- num_paras = 0
- for page_id, page in pdf_dict.items():
- if page_id.startswith("page_"):
- para_blocks = []
- if "para_blocks" in page.keys():
- input_blocks = page["para_blocks"]
- for input_block in input_blocks:
- new_block = self._cut_paras_per_block(input_block)
- para_blocks.append(new_block)
- num_paras += len(new_block["paras"])
- page["para_blocks"] = para_blocks
- pdf_dict["statistics"]["num_paras"] = num_paras
- return pdf_dict
- class BlockContinuationProcessor:
- """
- This class is used to process the blocks to detect block continuations.
- """
- def __init__(self) -> None:
- pass
- def __is_similar_font_type(self, font_type_1, font_type_2, prefix_length_ratio=0.3):
- """
- This function checks if the two font types are similar.
- Definition of similar font types: the two font types have a common prefix,
- and the length of the common prefix is at least a certain ratio of the length of the shorter font type.
- Parameters
- ----------
- font_type1 : str
- font type 1
- font_type2 : str
- font type 2
- prefix_length_ratio : float
- minimum ratio of the common prefix length to the length of the shorter font type
- Returns
- -------
- bool
- True if the two font types are similar, False otherwise.
- """
- if isinstance(font_type_1, list):
- font_type_1 = font_type_1[0] if font_type_1 else ""
- if isinstance(font_type_2, list):
- font_type_2 = font_type_2[0] if font_type_2 else ""
- if font_type_1 == font_type_2:
- return True
- # Find the length of the common prefix
- common_prefix_length = len(os.path.commonprefix([font_type_1, font_type_2]))
- # Calculate the minimum prefix length based on the ratio
- min_prefix_length = int(min(len(font_type_1), len(font_type_2)) * prefix_length_ratio)
- return common_prefix_length >= min_prefix_length
- def __is_same_block_font(self, block_1, block_2):
- """
- This function compares the font of block1 and block2
- Parameters
- ----------
- block1 : dict
- block1
- block2 : dict
- block2
- Returns
- -------
- is_same : bool
- True if block1 and block2 have the same font, else False
- """
- block_1_font_type = safe_get(block_1, "block_font_type", "")
- block_1_font_size = safe_get(block_1, "block_font_size", 0)
- block_1_avg_char_width = safe_get(block_1, "avg_char_width", 0)
- block_2_font_type = safe_get(block_2, "block_font_type", "")
- block_2_font_size = safe_get(block_2, "block_font_size", 0)
- block_2_avg_char_width = safe_get(block_2, "avg_char_width", 0)
- if isinstance(block_1_font_size, list):
- block_1_font_size = block_1_font_size[0] if block_1_font_size else 0
- if isinstance(block_2_font_size, list):
- block_2_font_size = block_2_font_size[0] if block_2_font_size else 0
- block_1_text = safe_get(block_1, "text", "")
- block_2_text = safe_get(block_2, "text", "")
- if block_1_avg_char_width == 0 or block_2_avg_char_width == 0:
- return False
- if not block_1_text or not block_2_text:
- return False
- else:
- text_len_ratio = len(block_2_text) / len(block_1_text)
- if text_len_ratio < 0.2:
- avg_char_width_condition = (
- abs(block_1_avg_char_width - block_2_avg_char_width) / min(block_1_avg_char_width, block_2_avg_char_width)
- < 0.5
- )
- else:
- avg_char_width_condition = (
- abs(block_1_avg_char_width - block_2_avg_char_width) / min(block_1_avg_char_width, block_2_avg_char_width)
- < 0.2
- )
- block_font_size_condition = abs(block_1_font_size - block_2_font_size) < 1
- return (
- self.__is_similar_font_type(block_1_font_type, block_2_font_type)
- and avg_char_width_condition
- and block_font_size_condition
- )
- def _is_alphabet_char(self, char):
- if (char >= "\u0041" and char <= "\u005a") or (char >= "\u0061" and char <= "\u007a"):
- return True
- else:
- return False
- def _is_chinese_char(self, char):
- if char >= "\u4e00" and char <= "\u9fa5":
- return True
- else:
- return False
- def _is_other_letter_char(self, char):
- try:
- cat = unicodedata.category(char)
- if cat == "Lu" or cat == "Ll":
- return not self._is_alphabet_char(char) and not self._is_chinese_char(char)
- except TypeError:
- print("The input to the function must be a single character.")
- return False
- def _is_year(self, s: str):
- try:
- number = int(s)
- return 1900 <= number <= 2099
- except ValueError:
- return False
- def _match_brackets(self, text):
- # pattern = r"^[\(\)\[\]()【】{}{}<><>〔〕〘〙\"\'“”‘’]"
- pattern = r"^[\(\)\]()】{}{}>>〕〙\"\'“”‘’]"
- return bool(re.match(pattern, text))
- def _is_para_font_consistent(self, para_1, para_2):
- """
- This function compares the font of para1 and para2
- Parameters
- ----------
- para1 : dict
- para1
- para2 : dict
- para2
- Returns
- -------
- is_same : bool
- True if para1 and para2 have the same font, else False
- """
- if para_1 is None or para_2 is None:
- return False
- para_1_font_type = safe_get(para_1, "para_font_type", "")
- para_1_font_size = safe_get(para_1, "para_font_size", 0)
- para_1_font_color = safe_get(para_1, "para_font_color", "")
- para_2_font_type = safe_get(para_2, "para_font_type", "")
- para_2_font_size = safe_get(para_2, "para_font_size", 0)
- para_2_font_color = safe_get(para_2, "para_font_color", "")
- if isinstance(para_1_font_type, list): # get the most common font type
- para_1_font_type = max(set(para_1_font_type), key=para_1_font_type.count)
- if isinstance(para_2_font_type, list):
- para_2_font_type = max(set(para_2_font_type), key=para_2_font_type.count)
- if isinstance(para_1_font_size, list): # compute average font type
- para_1_font_size = sum(para_1_font_size) / len(para_1_font_size)
- if isinstance(para_2_font_size, list): # compute average font type
- para_2_font_size = sum(para_2_font_size) / len(para_2_font_size)
- return (
- self.__is_similar_font_type(para_1_font_type, para_2_font_type)
- and abs(para_1_font_size - para_2_font_size) < 1.5
- # and para_font_color1 == para_font_color2
- )
- def _is_para_puncs_consistent(self, para_1, para_2):
- """
- This function determines whether para1 and para2 are originally from the same paragraph by checking the puncs of para1(former) and para2(latter)
- Parameters
- ----------
- para1 : dict
- para1
- para2 : dict
- para2
- Returns
- -------
- is_same : bool
- True if para1 and para2 are from the same paragraph by using the puncs, else False
- """
- para_1_text = safe_get(para_1, "para_text", "").strip()
- para_2_text = safe_get(para_2, "para_text", "").strip()
- para_1_bboxes = safe_get(para_1, "para_bbox", [])
- para_1_font_sizes = safe_get(para_1, "para_font_size", 0)
- para_2_bboxes = safe_get(para_2, "para_bbox", [])
- para_2_font_sizes = safe_get(para_2, "para_font_size", 0)
- # print_yellow(" Features of determine puncs_consistent:")
- # print(f" para_1_text: {para_1_text}")
- # print(f" para_2_text: {para_2_text}")
- # print(f" para_1_bboxes: {para_1_bboxes}")
- # print(f" para_2_bboxes: {para_2_bboxes}")
- # print(f" para_1_font_sizes: {para_1_font_sizes}")
- # print(f" para_2_font_sizes: {para_2_font_sizes}")
- if is_nested_list(para_1_bboxes):
- x0_1, y0_1, x1_1, y1_1 = para_1_bboxes[-1]
- else:
- x0_1, y0_1, x1_1, y1_1 = para_1_bboxes
- if is_nested_list(para_2_bboxes):
- x0_2, y0_2, x1_2, y1_2 = para_2_bboxes[0]
- para_2_font_sizes = para_2_font_sizes[0] # type: ignore
- else:
- x0_2, y0_2, x1_2, y1_2 = para_2_bboxes
- right_align_threshold = 0.5 * (para_1_font_sizes + para_2_font_sizes) * 0.8
- are_two_paras_right_aligned = abs(x1_1 - x1_2) < right_align_threshold
- left_indent_threshold = 0.5 * (para_1_font_sizes + para_2_font_sizes) * 0.8
- is_para1_left_indent_than_papa2 = x0_1 - x0_2 > left_indent_threshold
- is_para2_left_indent_than_papa1 = x0_2 - x0_1 > left_indent_threshold
- # Check if either para_text1 or para_text2 is empty
- if not para_1_text or not para_2_text:
- return False
- # Define the end puncs for a sentence to end and hyphen
- end_puncs = [".", "?", "!", "。", "?", "!", "…"]
- hyphen = ["-", "—"]
- # Check if para_text1 ends with either hyphen or non-end punctuation or spaces
- para_1_end_with_hyphen = para_1_text and para_1_text[-1] in hyphen
- para_1_end_with_end_punc = para_1_text and para_1_text[-1] in end_puncs
- para_1_end_with_space = para_1_text and para_1_text[-1] == " "
- para_1_not_end_with_end_punc = para_1_text and para_1_text[-1] not in end_puncs
- # print_yellow(f" para_1_end_with_hyphen: {para_1_end_with_hyphen}")
- # print_yellow(f" para_1_end_with_end_punc: {para_1_end_with_end_punc}")
- # print_yellow(f" para_1_not_end_with_end_punc: {para_1_not_end_with_end_punc}")
- # print_yellow(f" para_1_end_with_space: {para_1_end_with_space}")
- if para_1_end_with_hyphen: # If para_text1 ends with hyphen
- # print_red(f"para_1 is end with hyphen.")
- para_2_is_consistent = para_2_text and (
- para_2_text[0] in hyphen
- or (self._is_alphabet_char(para_2_text[0]) and para_2_text[0].islower())
- or (self._is_chinese_char(para_2_text[0]))
- or (self._is_other_letter_char(para_2_text[0]))
- )
- if para_2_is_consistent:
- # print(f"para_2 is consistent.\n")
- return True
- else:
- # print(f"para_2 is not consistent.\n")
- pass
- elif para_1_end_with_end_punc: # If para_text1 ends with ending punctuations
- # print_red(f"para_1 is end with end_punc.")
- para_2_is_consistent = (
- para_2_text
- and (
- para_2_text[0]
- == " "
- # or (self._is_alphabet_char(para_2_text[0]) and para_2_text[0].isupper())
- # or (self._is_chinese_char(para_2_text[0]))
- # or (self._is_other_letter_char(para_2_text[0]))
- )
- and not is_para2_left_indent_than_papa1
- )
- if para_2_is_consistent:
- # print(f"para_2 is consistent.\n")
- return True
- else:
- # print(f"para_2 is not consistent.\n")
- pass
- elif para_1_not_end_with_end_punc: # If para_text1 is not end with ending punctuations
- # print_red(f"para_1 is NOT end with end_punc.")
- para_2_is_consistent = para_2_text and (
- para_2_text[0] == " "
- or (self._is_alphabet_char(para_2_text[0]) and para_2_text[0].islower())
- or (self._is_alphabet_char(para_2_text[0]))
- or (self._is_year(para_2_text[0:4]))
- or (are_two_paras_right_aligned or is_para1_left_indent_than_papa2)
- or (self._is_chinese_char(para_2_text[0]))
- or (self._is_other_letter_char(para_2_text[0]))
- or (self._match_brackets(para_2_text[0]))
- )
- if para_2_is_consistent:
- # print(f"para_2 is consistent.\n")
- return True
- else:
- # print(f"para_2 is not consistent.\n")
- pass
- elif para_1_end_with_space: # If para_text1 ends with space
- # print_red(f"para_1 is end with space.")
- para_2_is_consistent = para_2_text and (
- para_2_text[0] == " "
- or (self._is_alphabet_char(para_2_text[0]) and para_2_text[0].islower())
- or (self._is_chinese_char(para_2_text[0]))
- or (self._is_other_letter_char(para_2_text[0]))
- )
- if para_2_is_consistent:
- # print(f"para_2 is consistent.\n")
- return True
- else:
- pass
- # print(f"para_2 is not consistent.\n")
- return False
- def _is_block_consistent(self, block_1, block_2):
- """
- This function determines whether block1 and block2 are originally from the same block
- Parameters
- ----------
- block1 : dict
- block1s
- block2 : dict
- block2
- Returns
- -------
- is_same : bool
- True if block1 and block2 are from the same block, else False
- """
- return self.__is_same_block_font(block_1, block_2)
- def _is_para_continued(self, para_1, para_2):
- """
- This function determines whether para1 and para2 are originally from the same paragraph
- Parameters
- ----------
- para1 : dict
- para1
- para2 : dict
- para2
- Returns
- -------
- is_same : bool
- True if para1 and para2 are from the same paragraph, else False
- """
- is_para_font_consistent = self._is_para_font_consistent(para_1, para_2)
- is_para_puncs_consistent = self._is_para_puncs_consistent(para_1, para_2)
- return is_para_font_consistent and is_para_puncs_consistent
- def _are_boundaries_of_block_consistent(self, block_1, block_2):
- """
- This function checks if the boundaries of block1 and block2 are consistent
- Parameters
- ----------
- block1 : dict
- block1
- block2 : dict
- block2
- Returns
- -------
- is_consistent : bool
- True if the boundaries of block1 and block2 are consistent, else False
- """
- last_line_of_block_1 = block_1["lines"][-1]
- first_line_of_block_2 = block_2["lines"][0]
- spans_of_last_line_of_block_1 = last_line_of_block_1["spans"]
- spans_of_first_line_of_block_2 = first_line_of_block_2["spans"]
- font_type_of_last_line_of_block_1 = spans_of_last_line_of_block_1[0]["font"].lower()
- font_size_of_last_line_of_block_1 = spans_of_last_line_of_block_1[0]["size"]
- font_color_of_last_line_of_block_1 = spans_of_last_line_of_block_1[0]["color"]
- font_flags_of_last_line_of_block_1 = spans_of_last_line_of_block_1[0]["flags"]
- font_type_of_first_line_of_block_2 = spans_of_first_line_of_block_2[0]["font"].lower()
- font_size_of_first_line_of_block_2 = spans_of_first_line_of_block_2[0]["size"]
- font_color_of_first_line_of_block_2 = spans_of_first_line_of_block_2[0]["color"]
- font_flags_of_first_line_of_block_2 = spans_of_first_line_of_block_2[0]["flags"]
- return (
- self.__is_similar_font_type(font_type_of_last_line_of_block_1, font_type_of_first_line_of_block_2)
- and abs(font_size_of_last_line_of_block_1 - font_size_of_first_line_of_block_2) < 1
- # and font_color_of_last_line_of_block1 == font_color_of_first_line_of_block2
- and font_flags_of_last_line_of_block_1 == font_flags_of_first_line_of_block_2
- )
- def should_merge_next_para(self, curr_para, next_para):
- """
- This function checks if the next_para should be merged into the curr_para.
- Parameters
- ----------
- curr_para : dict
- The current paragraph.
- next_para : dict
- The next paragraph.
- Returns
- -------
- bool
- True if the next_para should be merged into the curr_para, False otherwise.
- """
- if self._is_para_continued(curr_para, next_para):
- return True
- else:
- return False
- def batch_tag_paras(self, pdf_dict):
- """
- This function tags the paragraphs in the pdf_dict.
- Parameters
- ----------
- pdf_dict : dict
- PDF dictionary.
- Returns
- -------
- pdf_dict : dict
- PDF dictionary with tagged paragraphs.
- """
- the_last_page_id = len(pdf_dict) - 1
- for curr_page_idx, (curr_page_id, curr_page_content) in enumerate(pdf_dict.items()):
- if curr_page_id.startswith("page_") and curr_page_content.get("para_blocks", []):
- para_blocks_of_curr_page = curr_page_content["para_blocks"]
- next_page_idx = curr_page_idx + 1
- next_page_id = f"page_{next_page_idx}"
- next_page_content = pdf_dict.get(next_page_id, {})
- for i, current_block in enumerate(para_blocks_of_curr_page):
- for para_id, curr_para in current_block["paras"].items():
- curr_para["curr_para_location"] = [
- curr_page_idx,
- current_block["block_id"],
- int(para_id.split("_")[-1]),
- ]
- curr_para["next_para_location"] = None # 默认设置为None
- curr_para["merge_next_para"] = False # 默认设置为False
- next_block = para_blocks_of_curr_page[i + 1] if i < len(para_blocks_of_curr_page) - 1 else None
- if next_block:
- curr_block_last_para_key = list(current_block["paras"].keys())[-1]
- curr_blk_last_para = current_block["paras"][curr_block_last_para_key]
- next_block_first_para_key = list(next_block["paras"].keys())[0]
- next_blk_first_para = next_block["paras"][next_block_first_para_key]
- if self.should_merge_next_para(curr_blk_last_para, next_blk_first_para):
- curr_blk_last_para["next_para_location"] = [
- curr_page_idx,
- next_block["block_id"],
- int(next_block_first_para_key.split("_")[-1]),
- ]
- curr_blk_last_para["merge_next_para"] = True
- else:
- # Handle the case where the next block is in a different page
- curr_block_last_para_key = list(current_block["paras"].keys())[-1]
- curr_blk_last_para = current_block["paras"][curr_block_last_para_key]
- while not next_page_content.get("para_blocks", []) and next_page_idx <= the_last_page_id:
- next_page_idx += 1
- next_page_id = f"page_{next_page_idx}"
- next_page_content = pdf_dict.get(next_page_id, {})
- if next_page_content.get("para_blocks", []):
- next_blk_first_para_key = list(next_page_content["para_blocks"][0]["paras"].keys())[0]
- next_blk_first_para = next_page_content["para_blocks"][0]["paras"][next_blk_first_para_key]
- if self.should_merge_next_para(curr_blk_last_para, next_blk_first_para):
- curr_blk_last_para["next_para_location"] = [
- next_page_idx,
- next_page_content["para_blocks"][0]["block_id"],
- int(next_blk_first_para_key.split("_")[-1]),
- ]
- curr_blk_last_para["merge_next_para"] = True
- return pdf_dict
- def find_block_by_id(self, para_blocks, block_id):
- """
- This function finds a block by its id.
- Parameters
- ----------
- para_blocks : list
- List of blocks.
- block_id : int
- Id of the block to find.
- Returns
- -------
- block : dict
- The block with the given id.
- """
- for block in para_blocks:
- if block.get("block_id") == block_id:
- return block
- return None
- def batch_merge_paras(self, pdf_dict):
- """
- This function merges the paragraphs in the pdf_dict.
- Parameters
- ----------
- pdf_dict : dict
- PDF dictionary.
- Returns
- -------
- pdf_dict : dict
- PDF dictionary with merged paragraphs.
- """
- for page_id, page_content in pdf_dict.items():
- if page_id.startswith("page_") and page_content.get("para_blocks", []):
- para_blocks_of_page = page_content["para_blocks"]
- for i in range(len(para_blocks_of_page)):
- current_block = para_blocks_of_page[i]
- paras = current_block["paras"]
- for para_id, curr_para in list(paras.items()):
- # 跳过标题段落
- if curr_para.get("is_para_title"):
- continue
- while curr_para.get("merge_next_para"):
- next_para_location = curr_para.get("next_para_location")
- if not next_para_location:
- break
- next_page_idx, next_block_id, next_para_id = next_para_location
- next_page_id = f"page_{next_page_idx}"
- next_page_content = pdf_dict.get(next_page_id)
- if not next_page_content:
- break
- next_block = self.find_block_by_id(next_page_content.get("para_blocks", []), next_block_id)
- if not next_block:
- break
- next_para = next_block["paras"].get(f"para_{next_para_id}")
- if not next_para or next_para.get("is_para_title"):
- break
- # 合并段落文本
- curr_para_text = curr_para.get("para_text", "")
- next_para_text = next_para.get("para_text", "")
- curr_para["para_text"] = curr_para_text + " " + next_para_text
- # 更新 next_para_location
- curr_para["next_para_location"] = next_para.get("next_para_location")
- # 将下一个段落文本置为空,表示已被合并
- next_para["para_text"] = ""
- # 更新 merge_next_para 标记
- curr_para["merge_next_para"] = next_para.get("merge_next_para", False)
- return pdf_dict
- class DrawAnnos:
- """
- This class draws annotations on the pdf file
- ----------------------------------------
- Color Code
- ----------------------------------------
- Red: (1, 0, 0)
- Green: (0, 1, 0)
- Blue: (0, 0, 1)
- Yellow: (1, 1, 0) - mix of red and green
- Cyan: (0, 1, 1) - mix of green and blue
- Magenta: (1, 0, 1) - mix of red and blue
- White: (1, 1, 1) - red, green and blue full intensity
- Black: (0, 0, 0) - no color component whatsoever
- Gray: (0.5, 0.5, 0.5) - equal and medium intensity of red, green and blue color components
- Orange: (1, 0.65, 0) - maximum intensity of red, medium intensity of green, no blue component
- """
- def __init__(self) -> None:
- pass
- def __is_nested_list(self, lst):
- """
- This function returns True if the given list is a nested list of any degree.
- """
- if isinstance(lst, list):
- return any(self.__is_nested_list(i) for i in lst) or any(isinstance(i, list) for i in lst)
- return False
- def __valid_rect(self, bbox):
- # Ensure that the rectangle is not empty or invalid
- if isinstance(bbox[0], list):
- return False # It's a nested list, hence it can't be valid rect
- else:
- return bbox[0] < bbox[2] and bbox[1] < bbox[3]
- def __draw_nested_boxes(self, page, nested_bbox, color=(0, 1, 1)):
- """
- This function draws the nested boxes
- Parameters
- ----------
- page : fitz.Page
- page
- nested_bbox : list
- nested bbox
- color : tuple
- color, by default (0, 1, 1) # draw with cyan color for combined paragraph
- """
- if self.__is_nested_list(nested_bbox): # If it's a nested list
- for bbox in nested_bbox:
- self.__draw_nested_boxes(page, bbox, color) # Recursively call the function
- elif self.__valid_rect(nested_bbox): # If valid rectangle
- para_rect = fitz.Rect(nested_bbox)
- para_anno = page.add_rect_annot(para_rect)
- para_anno.set_colors(stroke=color) # draw with cyan color for combined paragraph
- para_anno.set_border(width=1)
- para_anno.update()
- def draw_annos(self, input_pdf_path, pdf_dic, output_pdf_path):
- """
- This function draws annotations on the pdf file.
- Parameters
- ----------
- input_pdf_path : str
- path to the input pdf file
- pdf_dic : dict
- pdf dictionary
- output_pdf_path : str
- path to the output pdf file
- pdf_dic : dict
- pdf dictionary
- """
- pdf_doc = open_pdf(input_pdf_path)
- if pdf_dic is None:
- pdf_dic = {}
- if output_pdf_path is None:
- output_pdf_path = input_pdf_path.replace(".pdf", "_anno.pdf")
- for page_id, page in enumerate(pdf_doc): # type: ignore
- page_key = f"page_{page_id}"
- for ele_key, ele_data in pdf_dic[page_key].items():
- if ele_key == "para_blocks":
- para_blocks = ele_data
- for para_block in para_blocks:
- if "paras" in para_block.keys():
- paras = para_block["paras"]
- for para_key, para_content in paras.items():
- para_bbox = para_content["para_bbox"]
- # print(f"para_bbox: {para_bbox}")
- # print(f"is a nested list: {self.__is_nested_list(para_bbox)}")
- if self.__is_nested_list(para_bbox) and len(para_bbox) > 1:
- color = (0, 1, 1)
- self.__draw_nested_boxes(
- page, para_bbox, color
- ) # draw with cyan color for combined paragraph
- else:
- if self.__valid_rect(para_bbox):
- para_rect = fitz.Rect(para_bbox)
- para_anno = page.add_rect_annot(para_rect)
- para_anno.set_colors(stroke=(0, 1, 0)) # draw with green color for normal paragraph
- para_anno.set_border(width=0.5)
- para_anno.update()
- is_para_title = para_content["is_para_title"]
- if is_para_title:
- if self.__is_nested_list(para_content["para_bbox"]) and len(para_content["para_bbox"]) > 1:
- color = (0, 0, 1)
- self.__draw_nested_boxes(
- page, para_content["para_bbox"], color
- ) # draw with cyan color for combined title
- else:
- if self.__valid_rect(para_content["para_bbox"]):
- para_rect = fitz.Rect(para_content["para_bbox"])
- if self.__valid_rect(para_content["para_bbox"]):
- para_anno = page.add_rect_annot(para_rect)
- para_anno.set_colors(stroke=(0, 0, 1)) # draw with blue color for normal title
- para_anno.set_border(width=0.5)
- para_anno.update()
- pdf_doc.save(output_pdf_path)
- pdf_doc.close()
- class ParaProcessPipeline:
- def __init__(self) -> None:
- pass
- def para_process_pipeline(self, pdf_info_dict, para_debug_mode=None, input_pdf_path=None, output_pdf_path=None):
- """
- This function processes the paragraphs, including:
- 1. Read raw input json file into pdf_dic
- 2. Detect and replace equations
- 3. Combine spans into a natural line
- 4. Check if the paragraphs are inside bboxes passed from "layout_bboxes" key
- 5. Compute statistics for each block
- 6. Detect titles in the document
- 7. Detect paragraphs inside each block
- 8. Divide the level of the titles
- 9. Detect and combine paragraphs from different blocks into one paragraph
- 10. Check whether the final results after checking headings, dividing paragraphs within blocks, and merging paragraphs between blocks are plausible and reasonable.
- 11. Draw annotations on the pdf file
- Parameters
- ----------
- pdf_dic_json_fpath : str
- path to the pdf dictionary json file.
- Notice: data noises, including overlap blocks, header, footer, watermark, vertical margin note have been removed already.
- input_pdf_doc : str
- path to the input pdf file
- output_pdf_path : str
- path to the output pdf file
- Returns
- -------
- pdf_dict : dict
- result dictionary
- """
- error_info = None
- output_json_file = ""
- output_dir = ""
- if input_pdf_path is not None:
- input_pdf_path = os.path.abspath(input_pdf_path)
- # print_green_on_red(f">>>>>>>>>>>>>>>>>>> Process the paragraphs of {input_pdf_path}")
- if output_pdf_path is not None:
- output_dir = os.path.dirname(output_pdf_path)
- output_json_file = f"{output_dir}/pdf_dic.json"
- def __save_pdf_dic(pdf_dic, output_pdf_path, stage="0", para_debug_mode=para_debug_mode):
- """
- Save the pdf_dic to a json file
- """
- output_pdf_file_name = os.path.basename(output_pdf_path)
- # output_dir = os.path.dirname(output_pdf_path)
- output_dir = "\\tmp\\pdf_parse"
- output_pdf_file_name = output_pdf_file_name.replace(".pdf", f"_stage_{stage}.json")
- pdf_dic_json_fpath = os.path.join(output_dir, output_pdf_file_name)
- if not os.path.exists(output_dir):
- os.makedirs(output_dir)
- if para_debug_mode == "full":
- with open(pdf_dic_json_fpath, "w", encoding="utf-8") as f:
- json.dump(pdf_dic, f, indent=2, ensure_ascii=False)
- # Validate the output already exists
- if not os.path.exists(pdf_dic_json_fpath):
- print_red(f"Failed to save the pdf_dic to {pdf_dic_json_fpath}")
- return None
- else:
- print_green(f"Succeed to save the pdf_dic to {pdf_dic_json_fpath}")
- return pdf_dic_json_fpath
- """
- Preprocess the lines of block
- """
- # Combine spans into a natural line
- rawBlockProcessor = RawBlockProcessor()
- pdf_dic = rawBlockProcessor.batch_process_blocks(pdf_info_dict)
- # print(f"pdf_dic['page_0']['para_blocks'][0]: {pdf_dic['page_0']['para_blocks'][0]}", end="\n\n")
- # Check if the paragraphs are inside bboxes passed from "layout_bboxes" key
- layoutFilter = LayoutFilterProcessor()
- pdf_dic = layoutFilter.batch_process_blocks(pdf_dic)
- # Compute statistics for each block
- blockStatisticsCalculator = BlockStatisticsCalculator()
- pdf_dic = blockStatisticsCalculator.batch_process_blocks(pdf_dic)
- # print(f"pdf_dic['page_0']['para_blocks'][0]: {pdf_dic['page_0']['para_blocks'][0]}", end="\n\n")
- # Compute statistics for all blocks(namely this pdf document)
- docStatisticsCalculator = DocStatisticsCalculator()
- pdf_dic = docStatisticsCalculator.calc_stats_of_doc(pdf_dic)
- # print(f"pdf_dic['statistics']: {pdf_dic['statistics']}", end="\n\n")
- # Dump the first three stages of pdf_dic to a json file
- if para_debug_mode == "full":
- pdf_dic_json_fpath = __save_pdf_dic(pdf_dic, output_pdf_path, stage="0", para_debug_mode=para_debug_mode)
- """
- Detect titles in the document
- """
- doc_statistics = pdf_dic["statistics"]
- titleProcessor = TitleProcessor(doc_statistics)
- titleProcessor.stage = 0
- pdf_dic = titleProcessor.batch_detect_titles(pdf_dic)
- if para_debug_mode == "full":
- pdf_dic_json_fpath = __save_pdf_dic(pdf_dic, output_pdf_path, stage="1", para_debug_mode=para_debug_mode)
- """
- Detect and divide the level of the titles
- """
- titleProcessor = TitleProcessor()
- pdf_dic = titleProcessor.batch_recog_title_level(pdf_dic)
- if para_debug_mode == "full":
- pdf_dic_json_fpath = __save_pdf_dic(pdf_dic, output_pdf_path, stage="2", para_debug_mode=para_debug_mode)
- """
- Detect and split paragraphs inside each block
- """
- blockInnerParasProcessor = BlockTerminationProcessor()
- pdf_dic = blockInnerParasProcessor.batch_process_blocks(pdf_dic)
- if para_debug_mode == "full":
- pdf_dic_json_fpath = __save_pdf_dic(pdf_dic, output_pdf_path, stage="3", para_debug_mode=para_debug_mode)
- # pdf_dic_json_fpath = __save_pdf_dic(pdf_dic, output_pdf_path, stage="3", para_debug_mode="full")
- # print_green(f"pdf_dic_json_fpath: {pdf_dic_json_fpath}")
- """
- Detect and combine paragraphs from different blocks into one paragraph
- """
- blockContinuationProcessor = BlockContinuationProcessor()
- pdf_dic = blockContinuationProcessor.batch_tag_paras(pdf_dic)
- pdf_dic = blockContinuationProcessor.batch_merge_paras(pdf_dic)
- """
- Detect titles in the document again
- """
- doc_statistics = pdf_dic["statistics"]
- titleProcessor = TitleProcessor(doc_statistics)
- titleProcessor.stage = 1
- # pdf_dic = titleProcessor.batch_detect_titles(pdf_dic)
- if para_debug_mode == "full":
- pdf_dic_json_fpath = __save_pdf_dic(pdf_dic, output_pdf_path, stage="4", para_debug_mode=para_debug_mode)
- # pdf_dic_json_fpath = __save_pdf_dic(pdf_dic, output_pdf_path, stage="4", para_debug_mode="full")
- # print_green(f"pdf_dic_json_fpath: {pdf_dic_json_fpath}")
- """
- Discard pdf files by checking exceptions and return the error info to the caller
- """
- discardByException = DiscardByException()
- is_discard_by_single_line_block = discardByException.discard_by_single_line_block(
- pdf_dic, exception=DenseSingleLineBlockException()
- )
- is_discard_by_title_detection = discardByException.discard_by_title_detection(
- pdf_dic, exception=TitleDetectionException()
- )
- is_discard_by_title_level = discardByException.discard_by_title_level(pdf_dic, exception=TitleLevelException())
- is_discard_by_split_para = discardByException.discard_by_split_para(pdf_dic, exception=ParaSplitException())
- is_discard_by_merge_para = discardByException.discard_by_merge_para(pdf_dic, exception=ParaMergeException())
- if is_discard_by_single_line_block is not None:
- error_info = is_discard_by_single_line_block
- elif is_discard_by_title_detection is not None:
- error_info = is_discard_by_title_detection
- elif is_discard_by_title_level is not None:
- error_info = is_discard_by_title_level
- elif is_discard_by_split_para is not None:
- error_info = is_discard_by_split_para
- elif is_discard_by_merge_para is not None:
- error_info = is_discard_by_merge_para
- if error_info is not None:
- return pdf_dic, error_info
- """
- Dump the final pdf_dic to a json file
- """
- if para_debug_mode is not None:
- with open(output_json_file, "w", encoding="utf-8") as f:
- json.dump(pdf_info_dict, f, ensure_ascii=False, indent=4)
- """
- Draw the annotations
- """
- if para_debug_mode is not None:
- drawAnnos = DrawAnnos()
- drawAnnos.draw_annos(input_pdf_path, pdf_dic, output_pdf_path)
- """
- Remove the intermediate files which are generated in the process of paragraph processing if debug_mode is simple
- """
- if para_debug_mode is not None:
- for fpath in os.listdir(output_dir):
- if fpath.endswith(".json") and "stage" in fpath:
- os.remove(os.path.join(output_dir, fpath))
- return pdf_dic, error_info
- """
- Run this script to test the function with Command:
- python detect_para.py [pdf_path] [output_pdf_path]
- Params:
- - pdf_path: the path of the pdf file
- - output_pdf_path: the path of the output pdf file
- """
- if __name__ == "__main__":
- DEFAULT_PDF_PATH = (
- "app/pdf_toolbox/tests/assets/paper/paper.pdf" if os.name != "nt" else "app\\pdf_toolbox\\tests\\assets\\paper\\paper.pdf"
- )
- input_pdf_path = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_PDF_PATH
- output_pdf_path = sys.argv[2] if len(sys.argv) > 2 else input_pdf_path.split(".")[0] + "_recogPara.pdf"
- output_json_path = sys.argv[3] if len(sys.argv) > 3 else input_pdf_path.split(".")[0] + "_recogPara.json"
- import stat
- # Remove existing output file if it exists
- if os.path.exists(output_pdf_path):
- os.chmod(output_pdf_path, stat.S_IWRITE)
- os.remove(output_pdf_path)
- input_pdf_doc = open_pdf(input_pdf_path)
- # postprocess the paragraphs
- paraProcessPipeline = ParaProcessPipeline()
- # parse paragraph and save to json file
- pdf_dic = {}
- blockInnerParasProcessor = BlockTerminationProcessor()
- """
- Construct the pdf dictionary.
- """
- for page_id, page in enumerate(input_pdf_doc): # type: ignore
- # print(f"Processing page {page_id}")
- # print(f"page: {page}")
- raw_blocks = page.get_text("dict")["blocks"]
- # Save text blocks to "preproc_blocks"
- preproc_blocks = []
- for block in raw_blocks:
- if block["type"] == 0:
- preproc_blocks.append(block)
- layout_bboxes = []
- # Construct the pdf dictionary as schema above
- page_dict = {
- "para_blocks": None,
- "preproc_blocks": preproc_blocks,
- "images": None,
- "tables": None,
- "interline_equations": None,
- "inline_equations": None,
- "layout_bboxes": None,
- "pymu_raw_blocks": None,
- "global_statistic": None,
- "droped_text_block": None,
- "droped_image_block": None,
- "droped_table_block": None,
- "image_backup": None,
- "table_backup": None,
- }
- pdf_dic[f"page_{page_id}"] = page_dict
- # print(f"pdf_dic: {pdf_dic}")
- with open(output_json_path, "w", encoding="utf-8") as f:
- json.dump(pdf_dic, f, ensure_ascii=False, indent=4)
- pdf_dic = paraProcessPipeline.para_process_pipeline(output_json_path, input_pdf_doc, output_pdf_path)
|