| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478 |
- # Copyright (c) 2016-present, Gregory Szorc
- # All rights reserved.
- #
- # This software may be modified and distributed under the terms
- # of the BSD license. See the LICENSE file for details.
- """Python interface to the Zstandard (zstd) compression library."""
- from __future__ import absolute_import, unicode_literals
- # This should match what the C extension exports.
- __all__ = [
- "BufferSegment",
- "BufferSegments",
- "BufferWithSegments",
- "BufferWithSegmentsCollection",
- "ZstdCompressionChunker",
- "ZstdCompressionDict",
- "ZstdCompressionObj",
- "ZstdCompressionParameters",
- "ZstdCompressionReader",
- "ZstdCompressionWriter",
- "ZstdCompressor",
- "ZstdDecompressionObj",
- "ZstdDecompressionReader",
- "ZstdDecompressionWriter",
- "ZstdDecompressor",
- "ZstdError",
- "FrameParameters",
- "backend_features",
- "estimate_decompression_context_size",
- "frame_content_size",
- "frame_header_size",
- "get_frame_parameters",
- "train_dictionary",
- # Constants.
- "FLUSH_BLOCK",
- "FLUSH_FRAME",
- "COMPRESSOBJ_FLUSH_FINISH",
- "COMPRESSOBJ_FLUSH_BLOCK",
- "ZSTD_VERSION",
- "FRAME_HEADER",
- "CONTENTSIZE_UNKNOWN",
- "CONTENTSIZE_ERROR",
- "MAX_COMPRESSION_LEVEL",
- "COMPRESSION_RECOMMENDED_INPUT_SIZE",
- "COMPRESSION_RECOMMENDED_OUTPUT_SIZE",
- "DECOMPRESSION_RECOMMENDED_INPUT_SIZE",
- "DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE",
- "MAGIC_NUMBER",
- "BLOCKSIZELOG_MAX",
- "BLOCKSIZE_MAX",
- "WINDOWLOG_MIN",
- "WINDOWLOG_MAX",
- "CHAINLOG_MIN",
- "CHAINLOG_MAX",
- "HASHLOG_MIN",
- "HASHLOG_MAX",
- "MINMATCH_MIN",
- "MINMATCH_MAX",
- "SEARCHLOG_MIN",
- "SEARCHLOG_MAX",
- "SEARCHLENGTH_MIN",
- "SEARCHLENGTH_MAX",
- "TARGETLENGTH_MIN",
- "TARGETLENGTH_MAX",
- "LDM_MINMATCH_MIN",
- "LDM_MINMATCH_MAX",
- "LDM_BUCKETSIZELOG_MAX",
- "STRATEGY_FAST",
- "STRATEGY_DFAST",
- "STRATEGY_GREEDY",
- "STRATEGY_LAZY",
- "STRATEGY_LAZY2",
- "STRATEGY_BTLAZY2",
- "STRATEGY_BTOPT",
- "STRATEGY_BTULTRA",
- "STRATEGY_BTULTRA2",
- "DICT_TYPE_AUTO",
- "DICT_TYPE_RAWCONTENT",
- "DICT_TYPE_FULLDICT",
- "FORMAT_ZSTD1",
- "FORMAT_ZSTD1_MAGICLESS",
- ]
- import io
- import os
- from ._cffi import ( # type: ignore
- ffi,
- lib,
- )
- backend_features = set() # type: ignore
- COMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_CStreamInSize()
- COMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_CStreamOutSize()
- DECOMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_DStreamInSize()
- DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_DStreamOutSize()
- new_nonzero = ffi.new_allocator(should_clear_after_alloc=False)
- MAX_COMPRESSION_LEVEL = lib.ZSTD_maxCLevel()
- MAGIC_NUMBER = lib.ZSTD_MAGICNUMBER
- FRAME_HEADER = b"\x28\xb5\x2f\xfd"
- CONTENTSIZE_UNKNOWN = lib.ZSTD_CONTENTSIZE_UNKNOWN
- CONTENTSIZE_ERROR = lib.ZSTD_CONTENTSIZE_ERROR
- ZSTD_VERSION = (
- lib.ZSTD_VERSION_MAJOR,
- lib.ZSTD_VERSION_MINOR,
- lib.ZSTD_VERSION_RELEASE,
- )
- BLOCKSIZELOG_MAX = lib.ZSTD_BLOCKSIZELOG_MAX
- BLOCKSIZE_MAX = lib.ZSTD_BLOCKSIZE_MAX
- WINDOWLOG_MIN = lib.ZSTD_WINDOWLOG_MIN
- WINDOWLOG_MAX = lib.ZSTD_WINDOWLOG_MAX
- CHAINLOG_MIN = lib.ZSTD_CHAINLOG_MIN
- CHAINLOG_MAX = lib.ZSTD_CHAINLOG_MAX
- HASHLOG_MIN = lib.ZSTD_HASHLOG_MIN
- HASHLOG_MAX = lib.ZSTD_HASHLOG_MAX
- MINMATCH_MIN = lib.ZSTD_MINMATCH_MIN
- MINMATCH_MAX = lib.ZSTD_MINMATCH_MAX
- SEARCHLOG_MIN = lib.ZSTD_SEARCHLOG_MIN
- SEARCHLOG_MAX = lib.ZSTD_SEARCHLOG_MAX
- SEARCHLENGTH_MIN = lib.ZSTD_MINMATCH_MIN
- SEARCHLENGTH_MAX = lib.ZSTD_MINMATCH_MAX
- TARGETLENGTH_MIN = lib.ZSTD_TARGETLENGTH_MIN
- TARGETLENGTH_MAX = lib.ZSTD_TARGETLENGTH_MAX
- LDM_MINMATCH_MIN = lib.ZSTD_LDM_MINMATCH_MIN
- LDM_MINMATCH_MAX = lib.ZSTD_LDM_MINMATCH_MAX
- LDM_BUCKETSIZELOG_MAX = lib.ZSTD_LDM_BUCKETSIZELOG_MAX
- STRATEGY_FAST = lib.ZSTD_fast
- STRATEGY_DFAST = lib.ZSTD_dfast
- STRATEGY_GREEDY = lib.ZSTD_greedy
- STRATEGY_LAZY = lib.ZSTD_lazy
- STRATEGY_LAZY2 = lib.ZSTD_lazy2
- STRATEGY_BTLAZY2 = lib.ZSTD_btlazy2
- STRATEGY_BTOPT = lib.ZSTD_btopt
- STRATEGY_BTULTRA = lib.ZSTD_btultra
- STRATEGY_BTULTRA2 = lib.ZSTD_btultra2
- DICT_TYPE_AUTO = lib.ZSTD_dct_auto
- DICT_TYPE_RAWCONTENT = lib.ZSTD_dct_rawContent
- DICT_TYPE_FULLDICT = lib.ZSTD_dct_fullDict
- FORMAT_ZSTD1 = lib.ZSTD_f_zstd1
- FORMAT_ZSTD1_MAGICLESS = lib.ZSTD_f_zstd1_magicless
- FLUSH_BLOCK = 0
- FLUSH_FRAME = 1
- COMPRESSOBJ_FLUSH_FINISH = 0
- COMPRESSOBJ_FLUSH_BLOCK = 1
- def _cpu_count():
- # os.cpu_count() was introducd in Python 3.4.
- try:
- return os.cpu_count() or 0
- except AttributeError:
- pass
- # Linux.
- try:
- return os.sysconf("SC_NPROCESSORS_ONLN")
- except (AttributeError, ValueError):
- pass
- # TODO implement on other platforms.
- return 0
- class BufferSegment:
- """Represents a segment within a ``BufferWithSegments``.
- This type is essentially a reference to N bytes within a
- ``BufferWithSegments``.
- The object conforms to the buffer protocol.
- """
- @property
- def offset(self):
- """The byte offset of this segment within its parent buffer."""
- raise NotImplementedError()
- def __len__(self):
- """Obtain the length of the segment, in bytes."""
- raise NotImplementedError()
- def tobytes(self):
- """Obtain bytes copy of this segment."""
- raise NotImplementedError()
- class BufferSegments:
- """Represents an array of ``(offset, length)`` integers.
- This type is effectively an index used by :py:class:`BufferWithSegments`.
- The array members are 64-bit unsigned integers using host/native bit order.
- Instances conform to the buffer protocol.
- """
- class BufferWithSegments:
- """A memory buffer containing N discrete items of known lengths.
- This type is essentially a fixed size memory address and an array
- of 2-tuples of ``(offset, length)`` 64-bit unsigned native-endian
- integers defining the byte offset and length of each segment within
- the buffer.
- Instances behave like containers.
- Instances also conform to the buffer protocol. So a reference to the
- backing bytes can be obtained via ``memoryview(o)``. A *copy* of the
- backing bytes can be obtained via ``.tobytes()``.
- This type exists to facilitate operations against N>1 items without
- the overhead of Python object creation and management. Used with
- APIs like :py:meth:`ZstdDecompressor.multi_decompress_to_buffer`, it
- is possible to decompress many objects in parallel without the GIL
- held, leading to even better performance.
- """
- @property
- def size(self):
- """Total sizein bytes of the backing buffer."""
- raise NotImplementedError()
- def __len__(self):
- raise NotImplementedError()
- def __getitem__(self, i):
- """Obtains a segment within the buffer.
- The returned object references memory within this buffer.
- :param i:
- Integer index of segment to retrieve.
- :return:
- :py:class:`BufferSegment`
- """
- raise NotImplementedError()
- def segments(self):
- """Obtain the array of ``(offset, length)`` segments in the buffer.
- :return:
- :py:class:`BufferSegments`
- """
- raise NotImplementedError()
- def tobytes(self):
- """Obtain bytes copy of this instance."""
- raise NotImplementedError()
- class BufferWithSegmentsCollection:
- """A virtual spanning view over multiple BufferWithSegments.
- Instances are constructed from 1 or more :py:class:`BufferWithSegments`
- instances. The resulting object behaves like an ordered sequence whose
- members are the segments within each ``BufferWithSegments``.
- If the object is composed of 2 ``BufferWithSegments`` instances with the
- first having 2 segments and the second have 3 segments, then ``b[0]``
- and ``b[1]`` access segments in the first object and ``b[2]``, ``b[3]``,
- and ``b[4]`` access segments from the second.
- """
- def __len__(self):
- """The number of segments within all ``BufferWithSegments``."""
- raise NotImplementedError()
- def __getitem__(self, i):
- """Obtain the ``BufferSegment`` at an offset."""
- raise NotImplementedError()
- class ZstdError(Exception):
- pass
- def _zstd_error(zresult):
- # Resolves to bytes on Python 2 and 3. We use the string for formatting
- # into error messages, which will be literal unicode. So convert it to
- # unicode.
- return ffi.string(lib.ZSTD_getErrorName(zresult)).decode("utf-8")
- def _make_cctx_params(params):
- res = lib.ZSTD_createCCtxParams()
- if res == ffi.NULL:
- raise MemoryError()
- res = ffi.gc(res, lib.ZSTD_freeCCtxParams)
- attrs = [
- (lib.ZSTD_c_format, params.format),
- (lib.ZSTD_c_compressionLevel, params.compression_level),
- (lib.ZSTD_c_windowLog, params.window_log),
- (lib.ZSTD_c_hashLog, params.hash_log),
- (lib.ZSTD_c_chainLog, params.chain_log),
- (lib.ZSTD_c_searchLog, params.search_log),
- (lib.ZSTD_c_minMatch, params.min_match),
- (lib.ZSTD_c_targetLength, params.target_length),
- (lib.ZSTD_c_strategy, params.strategy),
- (lib.ZSTD_c_contentSizeFlag, params.write_content_size),
- (lib.ZSTD_c_checksumFlag, params.write_checksum),
- (lib.ZSTD_c_dictIDFlag, params.write_dict_id),
- (lib.ZSTD_c_nbWorkers, params.threads),
- (lib.ZSTD_c_jobSize, params.job_size),
- (lib.ZSTD_c_overlapLog, params.overlap_log),
- (lib.ZSTD_c_forceMaxWindow, params.force_max_window),
- (lib.ZSTD_c_enableLongDistanceMatching, params.enable_ldm),
- (lib.ZSTD_c_ldmHashLog, params.ldm_hash_log),
- (lib.ZSTD_c_ldmMinMatch, params.ldm_min_match),
- (lib.ZSTD_c_ldmBucketSizeLog, params.ldm_bucket_size_log),
- (lib.ZSTD_c_ldmHashRateLog, params.ldm_hash_rate_log),
- ]
- for param, value in attrs:
- _set_compression_parameter(res, param, value)
- return res
- class ZstdCompressionParameters(object):
- """Low-level zstd compression parameters.
- This type represents a collection of parameters to control how zstd
- compression is performed.
- Instances can be constructed from raw parameters or derived from a
- base set of defaults specified from a compression level (recommended)
- via :py:meth:`ZstdCompressionParameters.from_level`.
- >>> # Derive compression settings for compression level 7.
- >>> params = zstandard.ZstdCompressionParameters.from_level(7)
- >>> # With an input size of 1MB
- >>> params = zstandard.ZstdCompressionParameters.from_level(7, source_size=1048576)
- Using ``from_level()``, it is also possible to override individual compression
- parameters or to define additional settings that aren't automatically derived.
- e.g.:
- >>> params = zstandard.ZstdCompressionParameters.from_level(4, window_log=10)
- >>> params = zstandard.ZstdCompressionParameters.from_level(5, threads=4)
- Or you can define low-level compression settings directly:
- >>> params = zstandard.ZstdCompressionParameters(window_log=12, enable_ldm=True)
- Once a ``ZstdCompressionParameters`` instance is obtained, it can be used to
- configure a compressor:
- >>> cctx = zstandard.ZstdCompressor(compression_params=params)
- Some of these are very low-level settings. It may help to consult the official
- zstandard documentation for their behavior. Look for the ``ZSTD_p_*`` constants
- in ``zstd.h`` (https://github.com/facebook/zstd/blob/dev/lib/zstd.h).
- """
- @staticmethod
- def from_level(level, source_size=0, dict_size=0, **kwargs):
- """Create compression parameters from a compression level.
- :param level:
- Integer compression level.
- :param source_size:
- Integer size in bytes of source to be compressed.
- :param dict_size:
- Integer size in bytes of compression dictionary to use.
- :return:
- :py:class:`ZstdCompressionParameters`
- """
- params = lib.ZSTD_getCParams(level, source_size, dict_size)
- args = {
- "window_log": "windowLog",
- "chain_log": "chainLog",
- "hash_log": "hashLog",
- "search_log": "searchLog",
- "min_match": "minMatch",
- "target_length": "targetLength",
- "strategy": "strategy",
- }
- for arg, attr in args.items():
- if arg not in kwargs:
- kwargs[arg] = getattr(params, attr)
- return ZstdCompressionParameters(**kwargs)
- def __init__(
- self,
- format=0,
- compression_level=0,
- window_log=0,
- hash_log=0,
- chain_log=0,
- search_log=0,
- min_match=0,
- target_length=0,
- strategy=-1,
- write_content_size=1,
- write_checksum=0,
- write_dict_id=0,
- job_size=0,
- overlap_log=-1,
- force_max_window=0,
- enable_ldm=0,
- ldm_hash_log=0,
- ldm_min_match=0,
- ldm_bucket_size_log=0,
- ldm_hash_rate_log=-1,
- threads=0,
- ):
- params = lib.ZSTD_createCCtxParams()
- if params == ffi.NULL:
- raise MemoryError()
- params = ffi.gc(params, lib.ZSTD_freeCCtxParams)
- self._params = params
- if threads < 0:
- threads = _cpu_count()
- # We need to set ZSTD_c_nbWorkers before ZSTD_c_jobSize and ZSTD_c_overlapLog
- # because setting ZSTD_c_nbWorkers resets the other parameters.
- _set_compression_parameter(params, lib.ZSTD_c_nbWorkers, threads)
- _set_compression_parameter(params, lib.ZSTD_c_format, format)
- _set_compression_parameter(
- params, lib.ZSTD_c_compressionLevel, compression_level
- )
- _set_compression_parameter(params, lib.ZSTD_c_windowLog, window_log)
- _set_compression_parameter(params, lib.ZSTD_c_hashLog, hash_log)
- _set_compression_parameter(params, lib.ZSTD_c_chainLog, chain_log)
- _set_compression_parameter(params, lib.ZSTD_c_searchLog, search_log)
- _set_compression_parameter(params, lib.ZSTD_c_minMatch, min_match)
- _set_compression_parameter(
- params, lib.ZSTD_c_targetLength, target_length
- )
- if strategy == -1:
- strategy = 0
- _set_compression_parameter(params, lib.ZSTD_c_strategy, strategy)
- _set_compression_parameter(
- params, lib.ZSTD_c_contentSizeFlag, write_content_size
- )
- _set_compression_parameter(
- params, lib.ZSTD_c_checksumFlag, write_checksum
- )
- _set_compression_parameter(params, lib.ZSTD_c_dictIDFlag, write_dict_id)
- _set_compression_parameter(params, lib.ZSTD_c_jobSize, job_size)
- if overlap_log == -1:
- overlap_log = 0
- _set_compression_parameter(params, lib.ZSTD_c_overlapLog, overlap_log)
- _set_compression_parameter(
- params, lib.ZSTD_c_forceMaxWindow, force_max_window
- )
- _set_compression_parameter(
- params, lib.ZSTD_c_enableLongDistanceMatching, enable_ldm
- )
- _set_compression_parameter(params, lib.ZSTD_c_ldmHashLog, ldm_hash_log)
- _set_compression_parameter(
- params, lib.ZSTD_c_ldmMinMatch, ldm_min_match
- )
- _set_compression_parameter(
- params, lib.ZSTD_c_ldmBucketSizeLog, ldm_bucket_size_log
- )
- if ldm_hash_rate_log == -1:
- ldm_hash_rate_log = 0
- _set_compression_parameter(
- params, lib.ZSTD_c_ldmHashRateLog, ldm_hash_rate_log
- )
- @property
- def format(self):
- return _get_compression_parameter(self._params, lib.ZSTD_c_format)
- @property
- def compression_level(self):
- return _get_compression_parameter(
- self._params, lib.ZSTD_c_compressionLevel
- )
- @property
- def window_log(self):
- return _get_compression_parameter(self._params, lib.ZSTD_c_windowLog)
- @property
- def hash_log(self):
- return _get_compression_parameter(self._params, lib.ZSTD_c_hashLog)
- @property
- def chain_log(self):
- return _get_compression_parameter(self._params, lib.ZSTD_c_chainLog)
- @property
- def search_log(self):
- return _get_compression_parameter(self._params, lib.ZSTD_c_searchLog)
- @property
- def min_match(self):
- return _get_compression_parameter(self._params, lib.ZSTD_c_minMatch)
- @property
- def target_length(self):
- return _get_compression_parameter(self._params, lib.ZSTD_c_targetLength)
- @property
- def strategy(self):
- return _get_compression_parameter(self._params, lib.ZSTD_c_strategy)
- @property
- def write_content_size(self):
- return _get_compression_parameter(
- self._params, lib.ZSTD_c_contentSizeFlag
- )
- @property
- def write_checksum(self):
- return _get_compression_parameter(self._params, lib.ZSTD_c_checksumFlag)
- @property
- def write_dict_id(self):
- return _get_compression_parameter(self._params, lib.ZSTD_c_dictIDFlag)
- @property
- def job_size(self):
- return _get_compression_parameter(self._params, lib.ZSTD_c_jobSize)
- @property
- def overlap_log(self):
- return _get_compression_parameter(self._params, lib.ZSTD_c_overlapLog)
- @property
- def force_max_window(self):
- return _get_compression_parameter(
- self._params, lib.ZSTD_c_forceMaxWindow
- )
- @property
- def enable_ldm(self):
- return _get_compression_parameter(
- self._params, lib.ZSTD_c_enableLongDistanceMatching
- )
- @property
- def ldm_hash_log(self):
- return _get_compression_parameter(self._params, lib.ZSTD_c_ldmHashLog)
- @property
- def ldm_min_match(self):
- return _get_compression_parameter(self._params, lib.ZSTD_c_ldmMinMatch)
- @property
- def ldm_bucket_size_log(self):
- return _get_compression_parameter(
- self._params, lib.ZSTD_c_ldmBucketSizeLog
- )
- @property
- def ldm_hash_rate_log(self):
- return _get_compression_parameter(
- self._params, lib.ZSTD_c_ldmHashRateLog
- )
- @property
- def threads(self):
- return _get_compression_parameter(self._params, lib.ZSTD_c_nbWorkers)
- def estimated_compression_context_size(self):
- """Estimated size in bytes needed to compress with these parameters."""
- return lib.ZSTD_estimateCCtxSize_usingCCtxParams(self._params)
- def estimate_decompression_context_size():
- """Estimate the memory size requirements for a decompressor instance.
- :return:
- Integer number of bytes.
- """
- return lib.ZSTD_estimateDCtxSize()
- def _set_compression_parameter(params, param, value):
- zresult = lib.ZSTD_CCtxParams_setParameter(params, param, value)
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "unable to set compression context parameter: %s"
- % _zstd_error(zresult)
- )
- def _get_compression_parameter(params, param):
- result = ffi.new("int *")
- zresult = lib.ZSTD_CCtxParams_getParameter(params, param, result)
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "unable to get compression context parameter: %s"
- % _zstd_error(zresult)
- )
- return result[0]
- class ZstdCompressionWriter(object):
- """Writable compressing stream wrapper.
- ``ZstdCompressionWriter`` is a write-only stream interface for writing
- compressed data to another stream.
- This type conforms to the ``io.RawIOBase`` interface and should be usable
- by any type that operates against a *file-object* (``typing.BinaryIO``
- in Python type hinting speak). Only methods that involve writing will do
- useful things.
- As data is written to this stream (e.g. via ``write()``), that data
- is sent to the compressor. As compressed data becomes available from
- the compressor, it is sent to the underlying stream by calling its
- ``write()`` method.
- Both ``write()`` and ``flush()`` return the number of bytes written to the
- object's ``write()``. In many cases, small inputs do not accumulate enough
- data to cause a write and ``write()`` will return ``0``.
- Calling ``close()`` will mark the stream as closed and subsequent I/O
- operations will raise ``ValueError`` (per the documented behavior of
- ``io.RawIOBase``). ``close()`` will also call ``close()`` on the underlying
- stream if such a method exists and the instance was constructed with
- ``closefd=True``
- Instances are obtained by calling :py:meth:`ZstdCompressor.stream_writer`.
- Typically usage is as follows:
- >>> cctx = zstandard.ZstdCompressor(level=10)
- >>> compressor = cctx.stream_writer(fh)
- >>> compressor.write(b"chunk 0\\n")
- >>> compressor.write(b"chunk 1\\n")
- >>> compressor.flush()
- >>> # Receiver will be able to decode ``chunk 0\\nchunk 1\\n`` at this point.
- >>> # Receiver is also expecting more data in the zstd *frame*.
- >>>
- >>> compressor.write(b"chunk 2\\n")
- >>> compressor.flush(zstandard.FLUSH_FRAME)
- >>> # Receiver will be able to decode ``chunk 0\\nchunk 1\\nchunk 2``.
- >>> # Receiver is expecting no more data, as the zstd frame is closed.
- >>> # Any future calls to ``write()`` at this point will construct a new
- >>> # zstd frame.
- Instances can be used as context managers. Exiting the context manager is
- the equivalent of calling ``close()``, which is equivalent to calling
- ``flush(zstandard.FLUSH_FRAME)``:
- >>> cctx = zstandard.ZstdCompressor(level=10)
- >>> with cctx.stream_writer(fh) as compressor:
- ... compressor.write(b'chunk 0')
- ... compressor.write(b'chunk 1')
- ... ...
- .. important::
- If ``flush(FLUSH_FRAME)`` is not called, emitted data doesn't
- constitute a full zstd *frame* and consumers of this data may complain
- about malformed input. It is recommended to use instances as a context
- manager to ensure *frames* are properly finished.
- If the size of the data being fed to this streaming compressor is known,
- you can declare it before compression begins:
- >>> cctx = zstandard.ZstdCompressor()
- >>> with cctx.stream_writer(fh, size=data_len) as compressor:
- ... compressor.write(chunk0)
- ... compressor.write(chunk1)
- ... ...
- Declaring the size of the source data allows compression parameters to
- be tuned. And if ``write_content_size`` is used, it also results in the
- content size being written into the frame header of the output data.
- The size of chunks being ``write()`` to the destination can be specified:
- >>> cctx = zstandard.ZstdCompressor()
- >>> with cctx.stream_writer(fh, write_size=32768) as compressor:
- ... ...
- To see how much memory is being used by the streaming compressor:
- >>> cctx = zstandard.ZstdCompressor()
- >>> with cctx.stream_writer(fh) as compressor:
- ... ...
- ... byte_size = compressor.memory_size()
- Thte total number of bytes written so far are exposed via ``tell()``:
- >>> cctx = zstandard.ZstdCompressor()
- >>> with cctx.stream_writer(fh) as compressor:
- ... ...
- ... total_written = compressor.tell()
- ``stream_writer()`` accepts a ``write_return_read`` boolean argument to
- control the return value of ``write()``. When ``False`` (the default),
- ``write()`` returns the number of bytes that were ``write()``'en to the
- underlying object. When ``True``, ``write()`` returns the number of bytes
- read from the input that were subsequently written to the compressor.
- ``True`` is the *proper* behavior for ``write()`` as specified by the
- ``io.RawIOBase`` interface and will become the default value in a future
- release.
- """
- def __init__(
- self,
- compressor,
- writer,
- source_size,
- write_size,
- write_return_read,
- closefd=True,
- ):
- self._compressor = compressor
- self._writer = writer
- self._write_size = write_size
- self._write_return_read = bool(write_return_read)
- self._closefd = bool(closefd)
- self._entered = False
- self._closing = False
- self._closed = False
- self._bytes_compressed = 0
- self._dst_buffer = ffi.new("char[]", write_size)
- self._out_buffer = ffi.new("ZSTD_outBuffer *")
- self._out_buffer.dst = self._dst_buffer
- self._out_buffer.size = len(self._dst_buffer)
- self._out_buffer.pos = 0
- zresult = lib.ZSTD_CCtx_setPledgedSrcSize(compressor._cctx, source_size)
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "error setting source size: %s" % _zstd_error(zresult)
- )
- def __enter__(self):
- if self._closed:
- raise ValueError("stream is closed")
- if self._entered:
- raise ZstdError("cannot __enter__ multiple times")
- self._entered = True
- return self
- def __exit__(self, exc_type, exc_value, exc_tb):
- self._entered = False
- self.close()
- self._compressor = None
- return False
- def __iter__(self):
- raise io.UnsupportedOperation()
- def __next__(self):
- raise io.UnsupportedOperation()
- def memory_size(self):
- return lib.ZSTD_sizeof_CCtx(self._compressor._cctx)
- def fileno(self):
- f = getattr(self._writer, "fileno", None)
- if f:
- return f()
- else:
- raise OSError("fileno not available on underlying writer")
- def close(self):
- if self._closed:
- return
- try:
- self._closing = True
- self.flush(FLUSH_FRAME)
- finally:
- self._closing = False
- self._closed = True
- # Call close() on underlying stream as well.
- f = getattr(self._writer, "close", None)
- if self._closefd and f:
- f()
- @property
- def closed(self):
- return self._closed
- def isatty(self):
- return False
- def readable(self):
- return False
- def readline(self, size=-1):
- raise io.UnsupportedOperation()
- def readlines(self, hint=-1):
- raise io.UnsupportedOperation()
- def seek(self, offset, whence=None):
- raise io.UnsupportedOperation()
- def seekable(self):
- return False
- def truncate(self, size=None):
- raise io.UnsupportedOperation()
- def writable(self):
- return True
- def writelines(self, lines):
- raise NotImplementedError("writelines() is not yet implemented")
- def read(self, size=-1):
- raise io.UnsupportedOperation()
- def readall(self):
- raise io.UnsupportedOperation()
- def readinto(self, b):
- raise io.UnsupportedOperation()
- def write(self, data):
- """Send data to the compressor and possibly to the inner stream."""
- if self._closed:
- raise ValueError("stream is closed")
- total_write = 0
- data_buffer = ffi.from_buffer(data)
- in_buffer = ffi.new("ZSTD_inBuffer *")
- in_buffer.src = data_buffer
- in_buffer.size = len(data_buffer)
- in_buffer.pos = 0
- out_buffer = self._out_buffer
- out_buffer.pos = 0
- while in_buffer.pos < in_buffer.size:
- zresult = lib.ZSTD_compressStream2(
- self._compressor._cctx,
- out_buffer,
- in_buffer,
- lib.ZSTD_e_continue,
- )
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "zstd compress error: %s" % _zstd_error(zresult)
- )
- if out_buffer.pos:
- self._writer.write(
- ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
- )
- total_write += out_buffer.pos
- self._bytes_compressed += out_buffer.pos
- out_buffer.pos = 0
- if self._write_return_read:
- return in_buffer.pos
- else:
- return total_write
- def flush(self, flush_mode=FLUSH_BLOCK):
- """Evict data from compressor's internal state and write it to inner stream.
- Calling this method may result in 0 or more ``write()`` calls to the
- inner stream.
- This method will also call ``flush()`` on the inner stream, if such a
- method exists.
- :param flush_mode:
- How to flush the zstd compressor.
- ``zstandard.FLUSH_BLOCK`` will flush data already sent to the
- compressor but not emitted to the inner stream. The stream is still
- writable after calling this. This is the default behavior.
- See documentation for other ``zstandard.FLUSH_*`` constants for more
- flushing options.
- :return:
- Integer number of bytes written to the inner stream.
- """
- if flush_mode == FLUSH_BLOCK:
- flush = lib.ZSTD_e_flush
- elif flush_mode == FLUSH_FRAME:
- flush = lib.ZSTD_e_end
- else:
- raise ValueError("unknown flush_mode: %r" % flush_mode)
- if self._closed:
- raise ValueError("stream is closed")
- total_write = 0
- out_buffer = self._out_buffer
- out_buffer.pos = 0
- in_buffer = ffi.new("ZSTD_inBuffer *")
- in_buffer.src = ffi.NULL
- in_buffer.size = 0
- in_buffer.pos = 0
- while True:
- zresult = lib.ZSTD_compressStream2(
- self._compressor._cctx, out_buffer, in_buffer, flush
- )
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "zstd compress error: %s" % _zstd_error(zresult)
- )
- if out_buffer.pos:
- self._writer.write(
- ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
- )
- total_write += out_buffer.pos
- self._bytes_compressed += out_buffer.pos
- out_buffer.pos = 0
- if not zresult:
- break
- f = getattr(self._writer, "flush", None)
- if f and not self._closing:
- f()
- return total_write
- def tell(self):
- return self._bytes_compressed
- class ZstdCompressionObj(object):
- """A compressor conforming to the API in Python's standard library.
- This type implements an API similar to compression types in Python's
- standard library such as ``zlib.compressobj`` and ``bz2.BZ2Compressor``.
- This enables existing code targeting the standard library API to swap
- in this type to achieve zstd compression.
- .. important::
- The design of this API is not ideal for optimal performance.
- The reason performance is not optimal is because the API is limited to
- returning a single buffer holding compressed data. When compressing
- data, we don't know how much data will be emitted. So in order to
- capture all this data in a single buffer, we need to perform buffer
- reallocations and/or extra memory copies. This can add significant
- overhead depending on the size or nature of the compressed data how
- much your application calls this type.
- If performance is critical, consider an API like
- :py:meth:`ZstdCompressor.stream_reader`,
- :py:meth:`ZstdCompressor.stream_writer`,
- :py:meth:`ZstdCompressor.chunker`, or
- :py:meth:`ZstdCompressor.read_to_iter`, which result in less overhead
- managing buffers.
- Instances are obtained by calling :py:meth:`ZstdCompressor.compressobj`.
- Here is how this API should be used:
- >>> cctx = zstandard.ZstdCompressor()
- >>> cobj = cctx.compressobj()
- >>> data = cobj.compress(b"raw input 0")
- >>> data = cobj.compress(b"raw input 1")
- >>> data = cobj.flush()
- Or to flush blocks:
- >>> cctx.zstandard.ZstdCompressor()
- >>> cobj = cctx.compressobj()
- >>> data = cobj.compress(b"chunk in first block")
- >>> data = cobj.flush(zstandard.COMPRESSOBJ_FLUSH_BLOCK)
- >>> data = cobj.compress(b"chunk in second block")
- >>> data = cobj.flush()
- For best performance results, keep input chunks under 256KB. This avoids
- extra allocations for a large output object.
- It is possible to declare the input size of the data that will be fed
- into the compressor:
- >>> cctx = zstandard.ZstdCompressor()
- >>> cobj = cctx.compressobj(size=6)
- >>> data = cobj.compress(b"foobar")
- >>> data = cobj.flush()
- """
- def __init__(
- self, compressor, write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE
- ):
- self._compressor = compressor
- self._out = ffi.new("ZSTD_outBuffer *")
- self._dst_buffer = ffi.new("char[]", write_size)
- self._out.dst = self._dst_buffer
- self._out.size = write_size
- self._out.pos = 0
- self._finished = False
- def compress(self, data):
- """Send data to the compressor.
- This method receives bytes to feed to the compressor and returns
- bytes constituting zstd compressed data.
- The zstd compressor accumulates bytes and the returned bytes may be
- substantially smaller or larger than the size of the input data on
- any given call. The returned value may be the empty byte string
- (``b""``).
- :param data:
- Data to write to the compressor.
- :return:
- Compressed data.
- """
- if self._finished:
- raise ZstdError("cannot call compress() after compressor finished")
- data_buffer = ffi.from_buffer(data)
- source = ffi.new("ZSTD_inBuffer *")
- source.src = data_buffer
- source.size = len(data_buffer)
- source.pos = 0
- chunks = []
- while source.pos < len(data):
- zresult = lib.ZSTD_compressStream2(
- self._compressor._cctx, self._out, source, lib.ZSTD_e_continue
- )
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "zstd compress error: %s" % _zstd_error(zresult)
- )
- if self._out.pos:
- chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:])
- self._out.pos = 0
- return b"".join(chunks)
- def flush(self, flush_mode=COMPRESSOBJ_FLUSH_FINISH):
- """Emit data accumulated in the compressor that hasn't been outputted yet.
- The ``flush_mode`` argument controls how to end the stream.
- ``zstandard.COMPRESSOBJ_FLUSH_FINISH`` (the default) ends the
- compression stream and finishes a zstd frame. Once this type of flush
- is performed, ``compress()`` and ``flush()`` can no longer be called.
- This type of flush **must** be called to end the compression context. If
- not called, the emitted data may be incomplete and may not be readable
- by a decompressor.
- ``zstandard.COMPRESSOBJ_FLUSH_BLOCK`` will flush a zstd block. This
- ensures that all data fed to this instance will have been omitted and
- can be decoded by a decompressor. Flushes of this type can be performed
- multiple times. The next call to ``compress()`` will begin a new zstd
- block.
- :param flush_mode:
- How to flush the zstd compressor.
- :return:
- Compressed data.
- """
- if flush_mode not in (
- COMPRESSOBJ_FLUSH_FINISH,
- COMPRESSOBJ_FLUSH_BLOCK,
- ):
- raise ValueError("flush mode not recognized")
- if self._finished:
- raise ZstdError("compressor object already finished")
- if flush_mode == COMPRESSOBJ_FLUSH_BLOCK:
- z_flush_mode = lib.ZSTD_e_flush
- elif flush_mode == COMPRESSOBJ_FLUSH_FINISH:
- z_flush_mode = lib.ZSTD_e_end
- self._finished = True
- else:
- raise ZstdError("unhandled flush mode")
- assert self._out.pos == 0
- in_buffer = ffi.new("ZSTD_inBuffer *")
- in_buffer.src = ffi.NULL
- in_buffer.size = 0
- in_buffer.pos = 0
- chunks = []
- while True:
- zresult = lib.ZSTD_compressStream2(
- self._compressor._cctx, self._out, in_buffer, z_flush_mode
- )
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "error ending compression stream: %s" % _zstd_error(zresult)
- )
- if self._out.pos:
- chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:])
- self._out.pos = 0
- if not zresult:
- break
- return b"".join(chunks)
- class ZstdCompressionChunker(object):
- """Compress data to uniformly sized chunks.
- This type allows you to iteratively feed chunks of data into a compressor
- and produce output chunks of uniform size.
- ``compress()``, ``flush()``, and ``finish()`` all return an iterator of
- ``bytes`` instances holding compressed data. The iterator may be empty.
- Callers MUST iterate through all elements of the returned iterator before
- performing another operation on the object or else the compressor's
- internal state may become confused. This can result in an exception being
- raised or malformed data being emitted.
- All chunks emitted by ``compress()`` will have a length of the configured
- chunk size.
- ``flush()`` and ``finish()`` may return a final chunk smaller than
- the configured chunk size.
- Instances are obtained by calling :py:meth:`ZstdCompressor.chunker`.
- Here is how the API should be used:
- >>> cctx = zstandard.ZstdCompressor()
- >>> chunker = cctx.chunker(chunk_size=32768)
- >>>
- >>> with open(path, 'rb') as fh:
- ... while True:
- ... in_chunk = fh.read(32768)
- ... if not in_chunk:
- ... break
- ...
- ... for out_chunk in chunker.compress(in_chunk):
- ... # Do something with output chunk of size 32768.
- ...
- ... for out_chunk in chunker.finish():
- ... # Do something with output chunks that finalize the zstd frame.
- This compressor type is often a better alternative to
- :py:class:`ZstdCompressor.compressobj` because it has better performance
- properties.
- ``compressobj()`` will emit output data as it is available. This results
- in a *stream* of output chunks of varying sizes. The consistency of the
- output chunk size with ``chunker()`` is more appropriate for many usages,
- such as sending compressed data to a socket.
- ``compressobj()`` may also perform extra memory reallocations in order
- to dynamically adjust the sizes of the output chunks. Since ``chunker()``
- output chunks are all the same size (except for flushed or final chunks),
- there is less memory allocation/copying overhead.
- """
- def __init__(self, compressor, chunk_size):
- self._compressor = compressor
- self._out = ffi.new("ZSTD_outBuffer *")
- self._dst_buffer = ffi.new("char[]", chunk_size)
- self._out.dst = self._dst_buffer
- self._out.size = chunk_size
- self._out.pos = 0
- self._in = ffi.new("ZSTD_inBuffer *")
- self._in.src = ffi.NULL
- self._in.size = 0
- self._in.pos = 0
- self._finished = False
- def compress(self, data):
- """Feed new input data into the compressor.
- :param data:
- Data to feed to compressor.
- :return:
- Iterator of ``bytes`` representing chunks of compressed data.
- """
- if self._finished:
- raise ZstdError("cannot call compress() after compression finished")
- if self._in.src != ffi.NULL:
- raise ZstdError(
- "cannot perform operation before consuming output "
- "from previous operation"
- )
- data_buffer = ffi.from_buffer(data)
- if not len(data_buffer):
- return
- self._in.src = data_buffer
- self._in.size = len(data_buffer)
- self._in.pos = 0
- while self._in.pos < self._in.size:
- zresult = lib.ZSTD_compressStream2(
- self._compressor._cctx, self._out, self._in, lib.ZSTD_e_continue
- )
- if self._in.pos == self._in.size:
- self._in.src = ffi.NULL
- self._in.size = 0
- self._in.pos = 0
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "zstd compress error: %s" % _zstd_error(zresult)
- )
- if self._out.pos == self._out.size:
- yield ffi.buffer(self._out.dst, self._out.pos)[:]
- self._out.pos = 0
- def flush(self):
- """Flushes all data currently in the compressor.
- :return:
- Iterator of ``bytes`` of compressed data.
- """
- if self._finished:
- raise ZstdError("cannot call flush() after compression finished")
- if self._in.src != ffi.NULL:
- raise ZstdError(
- "cannot call flush() before consuming output from "
- "previous operation"
- )
- while True:
- zresult = lib.ZSTD_compressStream2(
- self._compressor._cctx, self._out, self._in, lib.ZSTD_e_flush
- )
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "zstd compress error: %s" % _zstd_error(zresult)
- )
- if self._out.pos:
- yield ffi.buffer(self._out.dst, self._out.pos)[:]
- self._out.pos = 0
- if not zresult:
- return
- def finish(self):
- """Signals the end of input data.
- No new data can be compressed after this method is called.
- This method will flush buffered data and finish the zstd frame.
- :return:
- Iterator of ``bytes`` of compressed data.
- """
- if self._finished:
- raise ZstdError("cannot call finish() after compression finished")
- if self._in.src != ffi.NULL:
- raise ZstdError(
- "cannot call finish() before consuming output from "
- "previous operation"
- )
- while True:
- zresult = lib.ZSTD_compressStream2(
- self._compressor._cctx, self._out, self._in, lib.ZSTD_e_end
- )
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "zstd compress error: %s" % _zstd_error(zresult)
- )
- if self._out.pos:
- yield ffi.buffer(self._out.dst, self._out.pos)[:]
- self._out.pos = 0
- if not zresult:
- self._finished = True
- return
- class ZstdCompressionReader(object):
- """Readable compressing stream wrapper.
- ``ZstdCompressionReader`` is a read-only stream interface for obtaining
- compressed data from a source.
- This type conforms to the ``io.RawIOBase`` interface and should be usable
- by any type that operates against a *file-object* (``typing.BinaryIO``
- in Python type hinting speak).
- Instances are neither writable nor seekable (even if the underlying
- source is seekable). ``readline()`` and ``readlines()`` are not implemented
- because they don't make sense for compressed data. ``tell()`` returns the
- number of compressed bytes emitted so far.
- Instances are obtained by calling :py:meth:`ZstdCompressor.stream_reader`.
- In this example, we open a file for reading and then wrap that file
- handle with a stream from which compressed data can be ``read()``.
- >>> with open(path, 'rb') as fh:
- ... cctx = zstandard.ZstdCompressor()
- ... reader = cctx.stream_reader(fh)
- ... while True:
- ... chunk = reader.read(16384)
- ... if not chunk:
- ... break
- ...
- ... # Do something with compressed chunk.
- Instances can also be used as context managers:
- >>> with open(path, 'rb') as fh:
- ... cctx = zstandard.ZstdCompressor()
- ... with cctx.stream_reader(fh) as reader:
- ... while True:
- ... chunk = reader.read(16384)
- ... if not chunk:
- ... break
- ...
- ... # Do something with compressed chunk.
- When the context manager exits or ``close()`` is called, the stream is
- closed, underlying resources are released, and future operations against
- the compression stream will fail.
- ``stream_reader()`` accepts a ``size`` argument specifying how large the
- input stream is. This is used to adjust compression parameters so they are
- tailored to the source size. e.g.
- >>> with open(path, 'rb') as fh:
- ... cctx = zstandard.ZstdCompressor()
- ... with cctx.stream_reader(fh, size=os.stat(path).st_size) as reader:
- ... ...
- If the ``source`` is a stream, you can specify how large ``read()``
- requests to that stream should be via the ``read_size`` argument.
- It defaults to ``zstandard.COMPRESSION_RECOMMENDED_INPUT_SIZE``. e.g.
- >>> with open(path, 'rb') as fh:
- ... cctx = zstandard.ZstdCompressor()
- ... # Will perform fh.read(8192) when obtaining data to feed into the
- ... # compressor.
- ... with cctx.stream_reader(fh, read_size=8192) as reader:
- ... ...
- """
- def __init__(self, compressor, source, read_size, closefd=True):
- self._compressor = compressor
- self._source = source
- self._read_size = read_size
- self._closefd = closefd
- self._entered = False
- self._closed = False
- self._bytes_compressed = 0
- self._finished_input = False
- self._finished_output = False
- self._in_buffer = ffi.new("ZSTD_inBuffer *")
- # Holds a ref so backing bytes in self._in_buffer stay alive.
- self._source_buffer = None
- def __enter__(self):
- if self._entered:
- raise ValueError("cannot __enter__ multiple times")
- if self._closed:
- raise ValueError("stream is closed")
- self._entered = True
- return self
- def __exit__(self, exc_type, exc_value, exc_tb):
- self._entered = False
- self._compressor = None
- self.close()
- self._source = None
- return False
- def readable(self):
- return True
- def writable(self):
- return False
- def seekable(self):
- return False
- def readline(self):
- raise io.UnsupportedOperation()
- def readlines(self):
- raise io.UnsupportedOperation()
- def write(self, data):
- raise OSError("stream is not writable")
- def writelines(self, ignored):
- raise OSError("stream is not writable")
- def isatty(self):
- return False
- def flush(self):
- return None
- def close(self):
- if self._closed:
- return
- self._closed = True
- f = getattr(self._source, "close", None)
- if self._closefd and f:
- f()
- @property
- def closed(self):
- return self._closed
- def tell(self):
- return self._bytes_compressed
- def readall(self):
- chunks = []
- while True:
- chunk = self.read(1048576)
- if not chunk:
- break
- chunks.append(chunk)
- return b"".join(chunks)
- def __iter__(self):
- raise io.UnsupportedOperation()
- def __next__(self):
- raise io.UnsupportedOperation()
- next = __next__
- def _read_input(self):
- if self._finished_input:
- return
- if hasattr(self._source, "read"):
- data = self._source.read(self._read_size)
- if not data:
- self._finished_input = True
- return
- self._source_buffer = ffi.from_buffer(data)
- self._in_buffer.src = self._source_buffer
- self._in_buffer.size = len(self._source_buffer)
- self._in_buffer.pos = 0
- else:
- self._source_buffer = ffi.from_buffer(self._source)
- self._in_buffer.src = self._source_buffer
- self._in_buffer.size = len(self._source_buffer)
- self._in_buffer.pos = 0
- def _compress_into_buffer(self, out_buffer):
- if self._in_buffer.pos >= self._in_buffer.size:
- return
- old_pos = out_buffer.pos
- zresult = lib.ZSTD_compressStream2(
- self._compressor._cctx,
- out_buffer,
- self._in_buffer,
- lib.ZSTD_e_continue,
- )
- self._bytes_compressed += out_buffer.pos - old_pos
- if self._in_buffer.pos == self._in_buffer.size:
- self._in_buffer.src = ffi.NULL
- self._in_buffer.pos = 0
- self._in_buffer.size = 0
- self._source_buffer = None
- if not hasattr(self._source, "read"):
- self._finished_input = True
- if lib.ZSTD_isError(zresult):
- raise ZstdError("zstd compress error: %s", _zstd_error(zresult))
- return out_buffer.pos and out_buffer.pos == out_buffer.size
- def read(self, size=-1):
- if self._closed:
- raise ValueError("stream is closed")
- if size < -1:
- raise ValueError("cannot read negative amounts less than -1")
- if size == -1:
- return self.readall()
- if self._finished_output or size == 0:
- return b""
- # Need a dedicated ref to dest buffer otherwise it gets collected.
- dst_buffer = ffi.new("char[]", size)
- out_buffer = ffi.new("ZSTD_outBuffer *")
- out_buffer.dst = dst_buffer
- out_buffer.size = size
- out_buffer.pos = 0
- if self._compress_into_buffer(out_buffer):
- return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
- while not self._finished_input:
- self._read_input()
- if self._compress_into_buffer(out_buffer):
- return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
- # EOF
- old_pos = out_buffer.pos
- zresult = lib.ZSTD_compressStream2(
- self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end
- )
- self._bytes_compressed += out_buffer.pos - old_pos
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "error ending compression stream: %s", _zstd_error(zresult)
- )
- if zresult == 0:
- self._finished_output = True
- return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
- def read1(self, size=-1):
- if self._closed:
- raise ValueError("stream is closed")
- if size < -1:
- raise ValueError("cannot read negative amounts less than -1")
- if self._finished_output or size == 0:
- return b""
- # -1 returns arbitrary number of bytes.
- if size == -1:
- size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE
- dst_buffer = ffi.new("char[]", size)
- out_buffer = ffi.new("ZSTD_outBuffer *")
- out_buffer.dst = dst_buffer
- out_buffer.size = size
- out_buffer.pos = 0
- # read1() dictates that we can perform at most 1 call to the
- # underlying stream to get input. However, we can't satisfy this
- # restriction with compression because not all input generates output.
- # It is possible to perform a block flush in order to ensure output.
- # But this may not be desirable behavior. So we allow multiple read()
- # to the underlying stream. But unlike read(), we stop once we have
- # any output.
- self._compress_into_buffer(out_buffer)
- if out_buffer.pos:
- return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
- while not self._finished_input:
- self._read_input()
- # If we've filled the output buffer, return immediately.
- if self._compress_into_buffer(out_buffer):
- return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
- # If we've populated the output buffer and we're not at EOF,
- # also return, as we've satisfied the read1() limits.
- if out_buffer.pos and not self._finished_input:
- return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
- # Else if we're at EOS and we have room left in the buffer,
- # fall through to below and try to add more data to the output.
- # EOF.
- old_pos = out_buffer.pos
- zresult = lib.ZSTD_compressStream2(
- self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end
- )
- self._bytes_compressed += out_buffer.pos - old_pos
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "error ending compression stream: %s" % _zstd_error(zresult)
- )
- if zresult == 0:
- self._finished_output = True
- return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
- def readinto(self, b):
- if self._closed:
- raise ValueError("stream is closed")
- if self._finished_output:
- return 0
- # TODO use writable=True once we require CFFI >= 1.12.
- dest_buffer = ffi.from_buffer(b)
- ffi.memmove(b, b"", 0)
- out_buffer = ffi.new("ZSTD_outBuffer *")
- out_buffer.dst = dest_buffer
- out_buffer.size = len(dest_buffer)
- out_buffer.pos = 0
- if self._compress_into_buffer(out_buffer):
- return out_buffer.pos
- while not self._finished_input:
- self._read_input()
- if self._compress_into_buffer(out_buffer):
- return out_buffer.pos
- # EOF.
- old_pos = out_buffer.pos
- zresult = lib.ZSTD_compressStream2(
- self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end
- )
- self._bytes_compressed += out_buffer.pos - old_pos
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "error ending compression stream: %s", _zstd_error(zresult)
- )
- if zresult == 0:
- self._finished_output = True
- return out_buffer.pos
- def readinto1(self, b):
- if self._closed:
- raise ValueError("stream is closed")
- if self._finished_output:
- return 0
- # TODO use writable=True once we require CFFI >= 1.12.
- dest_buffer = ffi.from_buffer(b)
- ffi.memmove(b, b"", 0)
- out_buffer = ffi.new("ZSTD_outBuffer *")
- out_buffer.dst = dest_buffer
- out_buffer.size = len(dest_buffer)
- out_buffer.pos = 0
- self._compress_into_buffer(out_buffer)
- if out_buffer.pos:
- return out_buffer.pos
- while not self._finished_input:
- self._read_input()
- if self._compress_into_buffer(out_buffer):
- return out_buffer.pos
- if out_buffer.pos and not self._finished_input:
- return out_buffer.pos
- # EOF.
- old_pos = out_buffer.pos
- zresult = lib.ZSTD_compressStream2(
- self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end
- )
- self._bytes_compressed += out_buffer.pos - old_pos
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "error ending compression stream: %s" % _zstd_error(zresult)
- )
- if zresult == 0:
- self._finished_output = True
- return out_buffer.pos
- class ZstdCompressor(object):
- """
- Create an object used to perform Zstandard compression.
- Each instance is essentially a wrapper around a ``ZSTD_CCtx`` from
- zstd's C API.
- An instance can compress data various ways. Instances can be used
- multiple times. Each compression operation will use the compression
- parameters defined at construction time.
- .. note:
- When using a compression dictionary and multiple compression
- operations are performed, the ``ZstdCompressionParameters`` derived
- from an integer compression ``level`` and the first compressed data's
- size will be reused for all subsequent operations. This may not be
- desirable if source data sizes vary significantly.
- ``compression_params`` is mutually exclusive with ``level``,
- ``write_checksum``, ``write_content_size``, ``write_dict_id``, and
- ``threads``.
- Assume that each ``ZstdCompressor`` instance can only handle a single
- logical compression operation at the same time. i.e. if you call a method
- like ``stream_reader()`` to obtain multiple objects derived from the same
- ``ZstdCompressor`` instance and attempt to use them simultaneously, errors
- will likely occur.
- If you need to perform multiple logical compression operations and you
- can't guarantee those operations are temporally non-overlapping, you need
- to obtain multiple ``ZstdCompressor`` instances.
- Unless specified otherwise, assume that no two methods of
- ``ZstdCompressor`` instances can be called from multiple Python
- threads simultaneously. In other words, assume instances are not thread safe
- unless stated otherwise.
- :param level:
- Integer compression level. Valid values are all negative integers
- through 22. Lower values generally yield faster operations with lower
- compression ratios. Higher values are generally slower but compress
- better. The default is 3, which is what the ``zstd`` CLI uses. Negative
- levels effectively engage ``--fast`` mode from the ``zstd`` CLI.
- :param dict_data:
- A ``ZstdCompressionDict`` to be used to compress with dictionary
- data.
- :param compression_params:
- A ``ZstdCompressionParameters`` instance defining low-level compression
- parameters. If defined, this will overwrite the ``level`` argument.
- :param write_checksum:
- If True, a 4 byte content checksum will be written with the compressed
- data, allowing the decompressor to perform content verification.
- :param write_content_size:
- If True (the default), the decompressed content size will be included
- in the header of the compressed data. This data will only be written if
- the compressor knows the size of the input data.
- :param write_dict_id:
- Determines whether the dictionary ID will be written into the compressed
- data. Defaults to True. Only adds content to the compressed data if
- a dictionary is being used.
- :param threads:
- Number of threads to use to compress data concurrently. When set,
- compression operations are performed on multiple threads. The default
- value (0) disables multi-threaded compression. A value of ``-1`` means
- to set the number of threads to the number of detected logical CPUs.
- """
- def __init__(
- self,
- level=3,
- dict_data=None,
- compression_params=None,
- write_checksum=None,
- write_content_size=None,
- write_dict_id=None,
- threads=0,
- ):
- if level > lib.ZSTD_maxCLevel():
- raise ValueError(
- "level must be less than %d" % lib.ZSTD_maxCLevel()
- )
- if threads < 0:
- threads = _cpu_count()
- if compression_params and write_checksum is not None:
- raise ValueError(
- "cannot define compression_params and write_checksum"
- )
- if compression_params and write_content_size is not None:
- raise ValueError(
- "cannot define compression_params and write_content_size"
- )
- if compression_params and write_dict_id is not None:
- raise ValueError(
- "cannot define compression_params and write_dict_id"
- )
- if compression_params and threads:
- raise ValueError("cannot define compression_params and threads")
- if compression_params:
- self._params = _make_cctx_params(compression_params)
- else:
- if write_dict_id is None:
- write_dict_id = True
- params = lib.ZSTD_createCCtxParams()
- if params == ffi.NULL:
- raise MemoryError()
- self._params = ffi.gc(params, lib.ZSTD_freeCCtxParams)
- _set_compression_parameter(
- self._params, lib.ZSTD_c_compressionLevel, level
- )
- _set_compression_parameter(
- self._params,
- lib.ZSTD_c_contentSizeFlag,
- write_content_size if write_content_size is not None else 1,
- )
- _set_compression_parameter(
- self._params,
- lib.ZSTD_c_checksumFlag,
- 1 if write_checksum else 0,
- )
- _set_compression_parameter(
- self._params, lib.ZSTD_c_dictIDFlag, 1 if write_dict_id else 0
- )
- if threads:
- _set_compression_parameter(
- self._params, lib.ZSTD_c_nbWorkers, threads
- )
- cctx = lib.ZSTD_createCCtx()
- if cctx == ffi.NULL:
- raise MemoryError()
- self._cctx = cctx
- self._dict_data = dict_data
- # We defer setting up garbage collection until after calling
- # _setup_cctx() to ensure the memory size estimate is more accurate.
- try:
- self._setup_cctx()
- finally:
- self._cctx = ffi.gc(
- cctx, lib.ZSTD_freeCCtx, size=lib.ZSTD_sizeof_CCtx(cctx)
- )
- def _setup_cctx(self):
- zresult = lib.ZSTD_CCtx_setParametersUsingCCtxParams(
- self._cctx, self._params
- )
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "could not set compression parameters: %s"
- % _zstd_error(zresult)
- )
- dict_data = self._dict_data
- if dict_data:
- if dict_data._cdict:
- zresult = lib.ZSTD_CCtx_refCDict(self._cctx, dict_data._cdict)
- else:
- zresult = lib.ZSTD_CCtx_loadDictionary_advanced(
- self._cctx,
- dict_data.as_bytes(),
- len(dict_data),
- lib.ZSTD_dlm_byRef,
- dict_data._dict_type,
- )
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "could not load compression dictionary: %s"
- % _zstd_error(zresult)
- )
- def memory_size(self):
- """Obtain the memory usage of this compressor, in bytes.
- >>> cctx = zstandard.ZstdCompressor()
- >>> memory = cctx.memory_size()
- """
- return lib.ZSTD_sizeof_CCtx(self._cctx)
- def compress(self, data):
- """
- Compress data in a single operation.
- This is the simplest mechanism to perform compression: simply pass in a
- value and get a compressed value back. It is almost the most prone to
- abuse.
- The input and output values must fit in memory, so passing in very large
- values can result in excessive memory usage. For this reason, one of the
- streaming based APIs is preferred for larger values.
- :param data:
- Source data to compress
- :return:
- Compressed data
- >>> cctx = zstandard.ZstdCompressor()
- >>> compressed = cctx.compress(b"data to compress")
- """
- lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only)
- data_buffer = ffi.from_buffer(data)
- dest_size = lib.ZSTD_compressBound(len(data_buffer))
- out = new_nonzero("char[]", dest_size)
- zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, len(data_buffer))
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "error setting source size: %s" % _zstd_error(zresult)
- )
- out_buffer = ffi.new("ZSTD_outBuffer *")
- in_buffer = ffi.new("ZSTD_inBuffer *")
- out_buffer.dst = out
- out_buffer.size = dest_size
- out_buffer.pos = 0
- in_buffer.src = data_buffer
- in_buffer.size = len(data_buffer)
- in_buffer.pos = 0
- zresult = lib.ZSTD_compressStream2(
- self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end
- )
- if lib.ZSTD_isError(zresult):
- raise ZstdError("cannot compress: %s" % _zstd_error(zresult))
- elif zresult:
- raise ZstdError("unexpected partial frame flush")
- return ffi.buffer(out, out_buffer.pos)[:]
- def compressobj(self, size=-1):
- """
- Obtain a compressor exposing the Python standard library compression API.
- See :py:class:`ZstdCompressionObj` for the full documentation.
- :param size:
- Size in bytes of data that will be compressed.
- :return:
- :py:class:`ZstdCompressionObj`
- """
- lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only)
- if size < 0:
- size = lib.ZSTD_CONTENTSIZE_UNKNOWN
- zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size)
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "error setting source size: %s" % _zstd_error(zresult)
- )
- cobj = ZstdCompressionObj(self, COMPRESSION_RECOMMENDED_OUTPUT_SIZE)
- return cobj
- def chunker(self, size=-1, chunk_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE):
- """
- Create an object for iterative compressing to same-sized chunks.
- This API is similar to :py:meth:`ZstdCompressor.compressobj` but has
- better performance properties.
- :param size:
- Size in bytes of data that will be compressed.
- :param chunk_size:
- Size of compressed chunks.
- :return:
- :py:class:`ZstdCompressionChunker`
- """
- lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only)
- if size < 0:
- size = lib.ZSTD_CONTENTSIZE_UNKNOWN
- zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size)
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "error setting source size: %s" % _zstd_error(zresult)
- )
- return ZstdCompressionChunker(self, chunk_size=chunk_size)
- def copy_stream(
- self,
- ifh,
- ofh,
- size=-1,
- read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE,
- write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE,
- ):
- """
- Copy data between 2 streams while compressing it.
- Data will be read from ``ifh``, compressed, and written to ``ofh``.
- ``ifh`` must have a ``read(size)`` method. ``ofh`` must have a
- ``write(data)``
- method.
- >>> cctx = zstandard.ZstdCompressor()
- >>> with open(input_path, "rb") as ifh, open(output_path, "wb") as ofh:
- ... cctx.copy_stream(ifh, ofh)
- It is also possible to declare the size of the source stream:
- >>> cctx = zstandard.ZstdCompressor()
- >>> cctx.copy_stream(ifh, ofh, size=len_of_input)
- You can also specify how large the chunks that are ``read()``
- and ``write()`` from and to the streams:
- >>> cctx = zstandard.ZstdCompressor()
- >>> cctx.copy_stream(ifh, ofh, read_size=32768, write_size=16384)
- The stream copier returns a 2-tuple of bytes read and written:
- >>> cctx = zstandard.ZstdCompressor()
- >>> read_count, write_count = cctx.copy_stream(ifh, ofh)
- :param ifh:
- Source stream to read from
- :param ofh:
- Destination stream to write to
- :param size:
- Size in bytes of the source stream. If defined, compression
- parameters will be tuned for this size.
- :param read_size:
- Chunk sizes that source stream should be ``read()`` from.
- :param write_size:
- Chunk sizes that destination stream should be ``write()`` to.
- :return:
- 2-tuple of ints of bytes read and written, respectively.
- """
- if not hasattr(ifh, "read"):
- raise ValueError("first argument must have a read() method")
- if not hasattr(ofh, "write"):
- raise ValueError("second argument must have a write() method")
- lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only)
- if size < 0:
- size = lib.ZSTD_CONTENTSIZE_UNKNOWN
- zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size)
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "error setting source size: %s" % _zstd_error(zresult)
- )
- in_buffer = ffi.new("ZSTD_inBuffer *")
- out_buffer = ffi.new("ZSTD_outBuffer *")
- dst_buffer = ffi.new("char[]", write_size)
- out_buffer.dst = dst_buffer
- out_buffer.size = write_size
- out_buffer.pos = 0
- total_read, total_write = 0, 0
- while True:
- data = ifh.read(read_size)
- if not data:
- break
- data_buffer = ffi.from_buffer(data)
- total_read += len(data_buffer)
- in_buffer.src = data_buffer
- in_buffer.size = len(data_buffer)
- in_buffer.pos = 0
- while in_buffer.pos < in_buffer.size:
- zresult = lib.ZSTD_compressStream2(
- self._cctx, out_buffer, in_buffer, lib.ZSTD_e_continue
- )
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "zstd compress error: %s" % _zstd_error(zresult)
- )
- if out_buffer.pos:
- ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
- total_write += out_buffer.pos
- out_buffer.pos = 0
- # We've finished reading. Flush the compressor.
- while True:
- zresult = lib.ZSTD_compressStream2(
- self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end
- )
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "error ending compression stream: %s" % _zstd_error(zresult)
- )
- if out_buffer.pos:
- ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
- total_write += out_buffer.pos
- out_buffer.pos = 0
- if zresult == 0:
- break
- return total_read, total_write
- def stream_reader(
- self,
- source,
- size=-1,
- read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE,
- closefd=True,
- ):
- """
- Wrap a readable source with a stream that can read compressed data.
- This will produce an object conforming to the ``io.RawIOBase``
- interface which can be ``read()`` from to retrieve compressed data
- from a source.
- The source object can be any object with a ``read(size)`` method
- or an object that conforms to the buffer protocol.
- See :py:class:`ZstdCompressionReader` for type documentation and usage
- examples.
- :param source:
- Object to read source data from
- :param size:
- Size in bytes of source object.
- :param read_size:
- How many bytes to request when ``read()``'ing from the source.
- :param closefd:
- Whether to close the source stream when the returned stream is
- closed.
- :return:
- :py:class:`ZstdCompressionReader`
- """
- lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only)
- try:
- size = len(source)
- except Exception:
- pass
- if size < 0:
- size = lib.ZSTD_CONTENTSIZE_UNKNOWN
- zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size)
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "error setting source size: %s" % _zstd_error(zresult)
- )
- return ZstdCompressionReader(self, source, read_size, closefd=closefd)
- def stream_writer(
- self,
- writer,
- size=-1,
- write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE,
- write_return_read=True,
- closefd=True,
- ):
- """
- Create a stream that will write compressed data into another stream.
- The argument to ``stream_writer()`` must have a ``write(data)`` method.
- As compressed data is available, ``write()`` will be called with the
- compressed data as its argument. Many common Python types implement
- ``write()``, including open file handles and ``io.BytesIO``.
- See :py:class:`ZstdCompressionWriter` for more documentation, including
- usage examples.
- :param writer:
- Stream to write compressed data to.
- :param size:
- Size in bytes of data to be compressed. If set, it will be used
- to influence compression parameter tuning and could result in the
- size being written into the header of the compressed data.
- :param write_size:
- How much data to ``write()`` to ``writer`` at a time.
- :param write_return_read:
- Whether ``write()`` should return the number of bytes that were
- consumed from the input.
- :param closefd:
- Whether to ``close`` the ``writer`` when this stream is closed.
- :return:
- :py:class:`ZstdCompressionWriter`
- """
- if not hasattr(writer, "write"):
- raise ValueError("must pass an object with a write() method")
- lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only)
- if size < 0:
- size = lib.ZSTD_CONTENTSIZE_UNKNOWN
- return ZstdCompressionWriter(
- self, writer, size, write_size, write_return_read, closefd=closefd
- )
- def read_to_iter(
- self,
- reader,
- size=-1,
- read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE,
- write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE,
- ):
- """
- Read uncompressed data from a reader and return an iterator
- Returns an iterator of compressed data produced from reading from
- ``reader``.
- This method provides a mechanism to stream compressed data out of a
- source as an iterator of data chunks.
- Uncompressed data will be obtained from ``reader`` by calling the
- ``read(size)`` method of it or by reading a slice (if ``reader``
- conforms to the *buffer protocol*). The source data will be streamed
- into a compressor. As compressed data is available, it will be exposed
- to the iterator.
- Data is read from the source in chunks of ``read_size``. Compressed
- chunks are at most ``write_size`` bytes. Both values default to the
- zstd input and and output defaults, respectively.
- If reading from the source via ``read()``, ``read()`` will be called
- until it raises or returns an empty bytes (``b""``). It is perfectly
- valid for the source to deliver fewer bytes than were what requested
- by ``read(size)``.
- The caller is partially in control of how fast data is fed into the
- compressor by how it consumes the returned iterator. The compressor
- will not consume from the reader unless the caller consumes from the
- iterator.
- >>> cctx = zstandard.ZstdCompressor()
- >>> for chunk in cctx.read_to_iter(fh):
- ... # Do something with emitted data.
- ``read_to_iter()`` accepts a ``size`` argument declaring the size of
- the input stream:
- >>> cctx = zstandard.ZstdCompressor()
- >>> for chunk in cctx.read_to_iter(fh, size=some_int):
- >>> pass
- You can also control the size that data is ``read()`` from the source
- and the ideal size of output chunks:
- >>> cctx = zstandard.ZstdCompressor()
- >>> for chunk in cctx.read_to_iter(fh, read_size=16384, write_size=8192):
- >>> pass
- ``read_to_iter()`` does not give direct control over the sizes of chunks
- fed into the compressor. Instead, chunk sizes will be whatever the object
- being read from delivers. These will often be of a uniform size.
- :param reader:
- Stream providing data to be compressed.
- :param size:
- Size in bytes of input data.
- :param read_size:
- Controls how many bytes are ``read()`` from the source.
- :param write_size:
- Controls the output size of emitted chunks.
- :return:
- Iterator of ``bytes``.
- """
- if hasattr(reader, "read"):
- have_read = True
- elif hasattr(reader, "__getitem__"):
- have_read = False
- buffer_offset = 0
- size = len(reader)
- else:
- raise ValueError(
- "must pass an object with a read() method or "
- "conforms to buffer protocol"
- )
- lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only)
- if size < 0:
- size = lib.ZSTD_CONTENTSIZE_UNKNOWN
- zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size)
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "error setting source size: %s" % _zstd_error(zresult)
- )
- in_buffer = ffi.new("ZSTD_inBuffer *")
- out_buffer = ffi.new("ZSTD_outBuffer *")
- in_buffer.src = ffi.NULL
- in_buffer.size = 0
- in_buffer.pos = 0
- dst_buffer = ffi.new("char[]", write_size)
- out_buffer.dst = dst_buffer
- out_buffer.size = write_size
- out_buffer.pos = 0
- while True:
- # We should never have output data sitting around after a previous
- # iteration.
- assert out_buffer.pos == 0
- # Collect input data.
- if have_read:
- read_result = reader.read(read_size)
- else:
- remaining = len(reader) - buffer_offset
- slice_size = min(remaining, read_size)
- read_result = reader[buffer_offset : buffer_offset + slice_size]
- buffer_offset += slice_size
- # No new input data. Break out of the read loop.
- if not read_result:
- break
- # Feed all read data into the compressor and emit output until
- # exhausted.
- read_buffer = ffi.from_buffer(read_result)
- in_buffer.src = read_buffer
- in_buffer.size = len(read_buffer)
- in_buffer.pos = 0
- while in_buffer.pos < in_buffer.size:
- zresult = lib.ZSTD_compressStream2(
- self._cctx, out_buffer, in_buffer, lib.ZSTD_e_continue
- )
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "zstd compress error: %s" % _zstd_error(zresult)
- )
- if out_buffer.pos:
- data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
- out_buffer.pos = 0
- yield data
- assert out_buffer.pos == 0
- # And repeat the loop to collect more data.
- continue
- # If we get here, input is exhausted. End the stream and emit what
- # remains.
- while True:
- assert out_buffer.pos == 0
- zresult = lib.ZSTD_compressStream2(
- self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end
- )
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "error ending compression stream: %s" % _zstd_error(zresult)
- )
- if out_buffer.pos:
- data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
- out_buffer.pos = 0
- yield data
- if zresult == 0:
- break
- def multi_compress_to_buffer(self, data, threads=-1):
- """
- Compress multiple pieces of data as a single function call.
- (Experimental. Not yet supported by CFFI backend.)
- This function is optimized to perform multiple compression operations
- as as possible with as little overhead as possible.
- Data to be compressed can be passed as a ``BufferWithSegmentsCollection``,
- a ``BufferWithSegments``, or a list containing byte like objects. Each
- element of the container will be compressed individually using the
- configured parameters on the ``ZstdCompressor`` instance.
- The ``threads`` argument controls how many threads to use for
- compression. The default is ``0`` which means to use a single thread.
- Negative values use the number of logical CPUs in the machine.
- The function returns a ``BufferWithSegmentsCollection``. This type
- represents N discrete memory allocations, each holding 1 or more
- compressed frames.
- Output data is written to shared memory buffers. This means that unlike
- regular Python objects, a reference to *any* object within the collection
- keeps the shared buffer and therefore memory backing it alive. This can
- have undesirable effects on process memory usage.
- The API and behavior of this function is experimental and will likely
- change. Known deficiencies include:
- * If asked to use multiple threads, it will always spawn that many
- threads, even if the input is too small to use them. It should
- automatically lower the thread count when the extra threads would
- just add overhead.
- * The buffer allocation strategy is fixed. There is room to make it
- dynamic, perhaps even to allow one output buffer per input,
- facilitating a variation of the API to return a list without the
- adverse effects of shared memory buffers.
- :param data:
- Source to read discrete pieces of data to compress.
- Can be a ``BufferWithSegmentsCollection``, a ``BufferWithSegments``,
- or a ``list[bytes]``.
- :return:
- BufferWithSegmentsCollection holding compressed data.
- """
- raise NotImplementedError()
- def frame_progression(self):
- """
- Return information on how much work the compressor has done.
- Returns a 3-tuple of (ingested, consumed, produced).
- >>> cctx = zstandard.ZstdCompressor()
- >>> (ingested, consumed, produced) = cctx.frame_progression()
- """
- progression = lib.ZSTD_getFrameProgression(self._cctx)
- return progression.ingested, progression.consumed, progression.produced
- class FrameParameters(object):
- """Information about a zstd frame.
- Instances have the following attributes:
- ``content_size``
- Integer size of original, uncompressed content. This will be ``0`` if the
- original content size isn't written to the frame (controlled with the
- ``write_content_size`` argument to ``ZstdCompressor``) or if the input
- content size was ``0``.
- ``window_size``
- Integer size of maximum back-reference distance in compressed data.
- ``dict_id``
- Integer of dictionary ID used for compression. ``0`` if no dictionary
- ID was used or if the dictionary ID was ``0``.
- ``has_checksum``
- Bool indicating whether a 4 byte content checksum is stored at the end
- of the frame.
- """
- def __init__(self, fparams):
- self.content_size = fparams.frameContentSize
- self.window_size = fparams.windowSize
- self.dict_id = fparams.dictID
- self.has_checksum = bool(fparams.checksumFlag)
- def frame_content_size(data):
- """Obtain the decompressed size of a frame.
- The returned value is usually accurate. But strictly speaking it should
- not be trusted.
- :return:
- ``-1`` if size unknown and a non-negative integer otherwise.
- """
- data_buffer = ffi.from_buffer(data)
- size = lib.ZSTD_getFrameContentSize(data_buffer, len(data_buffer))
- if size == lib.ZSTD_CONTENTSIZE_ERROR:
- raise ZstdError("error when determining content size")
- elif size == lib.ZSTD_CONTENTSIZE_UNKNOWN:
- return -1
- else:
- return size
- def frame_header_size(data):
- """Obtain the size of a frame header.
- :return:
- Integer size in bytes.
- """
- data_buffer = ffi.from_buffer(data)
- zresult = lib.ZSTD_frameHeaderSize(data_buffer, len(data_buffer))
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "could not determine frame header size: %s" % _zstd_error(zresult)
- )
- return zresult
- def get_frame_parameters(data, format=FORMAT_ZSTD1):
- """
- Parse a zstd frame header into frame parameters.
- Depending on which fields are present in the frame and their values, the
- length of the frame parameters varies. If insufficient bytes are passed
- in to fully parse the frame parameters, ``ZstdError`` is raised. To ensure
- frame parameters can be parsed, pass in at least 18 bytes.
- :param data:
- Data from which to read frame parameters.
- :param format:
- Set the format of data for the decoder.
- :return:
- :py:class:`FrameParameters`
- """
- params = ffi.new("ZSTD_FrameHeader *")
- data_buffer = ffi.from_buffer(data)
- zresult = lib.ZSTD_getFrameHeader_advanced(
- params, data_buffer, len(data_buffer), format
- )
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "cannot get frame parameters: %s" % _zstd_error(zresult)
- )
- if zresult:
- raise ZstdError(
- "not enough data for frame parameters; need %d bytes" % zresult
- )
- return FrameParameters(params[0])
- class ZstdCompressionDict(object):
- """Represents a computed compression dictionary.
- Instances are obtained by calling :py:func:`train_dictionary` or by
- passing bytes obtained from another source into the constructor.
- Instances can be constructed from bytes:
- >>> dict_data = zstandard.ZstdCompressionDict(data)
- It is possible to construct a dictionary from *any* data. If the data
- doesn't begin with a magic header, it will be treated as a *prefix*
- dictionary. *Prefix* dictionaries allow compression operations to
- reference raw data within the dictionary.
- It is possible to force the use of *prefix* dictionaries or to require
- a dictionary header:
- >>> dict_data = zstandard.ZstdCompressionDict(data, dict_type=zstandard.DICT_TYPE_RAWCONTENT)
- >>> dict_data = zstandard.ZstdCompressionDict(data, dict_type=zstandard.DICT_TYPE_FULLDICT)
- You can see how many bytes are in the dictionary by calling ``len()``:
- >>> dict_data = zstandard.train_dictionary(size, samples)
- >>> dict_size = len(dict_data) # will not be larger than ``size``
- Once you have a dictionary, you can pass it to the objects performing
- compression and decompression:
- >>> dict_data = zstandard.train_dictionary(131072, samples)
- >>> cctx = zstandard.ZstdCompressor(dict_data=dict_data)
- >>> for source_data in input_data:
- ... compressed = cctx.compress(source_data)
- ... # Do something with compressed data.
- ...
- >>> dctx = zstandard.ZstdDecompressor(dict_data=dict_data)
- >>> for compressed_data in input_data:
- ... buffer = io.BytesIO()
- ... with dctx.stream_writer(buffer) as decompressor:
- ... decompressor.write(compressed_data)
- ... # Do something with raw data in ``buffer``.
- Dictionaries have unique integer IDs. You can retrieve this ID via:
- >>> dict_id = dict_data.dict_id()
- You can obtain the raw data in the dict (useful for persisting and constructing
- a ``ZstdCompressionDict`` later) via ``as_bytes()``:
- >>> dict_data = zstandard.train_dictionary(size, samples)
- >>> raw_data = dict_data.as_bytes()
- By default, when a ``ZstdCompressionDict`` is *attached* to a
- ``ZstdCompressor``, each ``ZstdCompressor`` performs work to prepare the
- dictionary for use. This is fine if only 1 compression operation is being
- performed or if the ``ZstdCompressor`` is being reused for multiple operations.
- But if multiple ``ZstdCompressor`` instances are being used with the dictionary,
- this can add overhead.
- It is possible to *precompute* the dictionary so it can readily be consumed
- by multiple ``ZstdCompressor`` instances:
- >>> d = zstandard.ZstdCompressionDict(data)
- >>> # Precompute for compression level 3.
- >>> d.precompute_compress(level=3)
- >>> # Precompute with specific compression parameters.
- >>> params = zstandard.ZstdCompressionParameters(...)
- >>> d.precompute_compress(compression_params=params)
- .. note::
- When a dictionary is precomputed, the compression parameters used to
- precompute the dictionary overwrite some of the compression parameters
- specified to ``ZstdCompressor``.
- :param data:
- Dictionary data.
- :param dict_type:
- Type of dictionary. One of the ``DICT_TYPE_*`` constants.
- """
- def __init__(self, data, dict_type=DICT_TYPE_AUTO, k=0, d=0):
- assert isinstance(data, bytes)
- self._data = data
- self.k = k
- self.d = d
- if dict_type not in (
- DICT_TYPE_AUTO,
- DICT_TYPE_RAWCONTENT,
- DICT_TYPE_FULLDICT,
- ):
- raise ValueError(
- "invalid dictionary load mode: %d; must use "
- "DICT_TYPE_* constants"
- )
- self._dict_type = dict_type
- self._cdict = None
- def __len__(self):
- return len(self._data)
- def dict_id(self):
- """Obtain the integer ID of the dictionary."""
- return int(lib.ZDICT_getDictID(self._data, len(self._data)))
- def as_bytes(self):
- """Obtain the ``bytes`` representation of the dictionary."""
- return self._data
- def precompute_compress(self, level=0, compression_params=None):
- """Precompute a dictionary os it can be used by multiple compressors.
- Calling this method on an instance that will be used by multiple
- :py:class:`ZstdCompressor` instances will improve performance.
- """
- if level and compression_params:
- raise ValueError(
- "must only specify one of level or compression_params"
- )
- if not level and not compression_params:
- raise ValueError("must specify one of level or compression_params")
- if level:
- cparams = lib.ZSTD_getCParams(level, 0, len(self._data))
- else:
- cparams = ffi.new("ZSTD_compressionParameters")
- cparams.chainLog = compression_params.chain_log
- cparams.hashLog = compression_params.hash_log
- cparams.minMatch = compression_params.min_match
- cparams.searchLog = compression_params.search_log
- cparams.strategy = compression_params.strategy
- cparams.targetLength = compression_params.target_length
- cparams.windowLog = compression_params.window_log
- cdict = lib.ZSTD_createCDict_advanced(
- self._data,
- len(self._data),
- lib.ZSTD_dlm_byRef,
- self._dict_type,
- cparams,
- lib.ZSTD_defaultCMem,
- )
- if cdict == ffi.NULL:
- raise ZstdError("unable to precompute dictionary")
- self._cdict = ffi.gc(
- cdict, lib.ZSTD_freeCDict, size=lib.ZSTD_sizeof_CDict(cdict)
- )
- @property
- def _ddict(self):
- ddict = lib.ZSTD_createDDict_advanced(
- self._data,
- len(self._data),
- lib.ZSTD_dlm_byRef,
- self._dict_type,
- lib.ZSTD_defaultCMem,
- )
- if ddict == ffi.NULL:
- raise ZstdError("could not create decompression dict")
- ddict = ffi.gc(
- ddict, lib.ZSTD_freeDDict, size=lib.ZSTD_sizeof_DDict(ddict)
- )
- self.__dict__["_ddict"] = ddict
- return ddict
- def train_dictionary(
- dict_size,
- samples,
- k=0,
- d=0,
- f=0,
- split_point=0.0,
- accel=0,
- notifications=0,
- dict_id=0,
- level=0,
- steps=0,
- threads=0,
- ):
- """Train a dictionary from sample data using the COVER algorithm.
- A compression dictionary of size ``dict_size`` will be created from the
- iterable of ``samples``. The raw dictionary bytes will be returned.
- The dictionary training mechanism is known as *cover*. More details about it
- are available in the paper *Effective Construction of Relative Lempel-Ziv
- Dictionaries* (authors: Liao, Petri, Moffat, Wirth).
- The cover algorithm takes parameters ``k`` and ``d``. These are the
- *segment size* and *dmer size*, respectively. The returned dictionary
- instance created by this function has ``k`` and ``d`` attributes
- containing the values for these parameters. If a ``ZstdCompressionDict``
- is constructed from raw bytes data (a content-only dictionary), the
- ``k`` and ``d`` attributes will be ``0``.
- The segment and dmer size parameters to the cover algorithm can either be
- specified manually or ``train_dictionary()`` can try multiple values
- and pick the best one, where *best* means the smallest compressed data size.
- This later mode is called *optimization* mode.
- Under the hood, this function always calls
- ``ZDICT_optimizeTrainFromBuffer_fastCover()``. See the corresponding C library
- documentation for more.
- If neither ``steps`` nor ``threads`` is defined, defaults for ``d``, ``steps``,
- and ``level`` will be used that are equivalent with what
- ``ZDICT_trainFromBuffer()`` would use.
- :param dict_size:
- Target size in bytes of the dictionary to generate.
- :param samples:
- A list of bytes holding samples the dictionary will be trained from.
- :param k:
- Segment size : constraint: 0 < k : Reasonable range [16, 2048+]
- :param d:
- dmer size : constraint: 0 < d <= k : Reasonable range [6, 16]
- :param f:
- log of size of frequency array : constraint: 0 < f <= 31 : 1 means
- default(20)
- :param split_point:
- Percentage of samples used for training: Only used for optimization.
- The first # samples * ``split_point`` samples will be used to training.
- The last # samples * (1 - split_point) samples will be used for testing.
- 0 means default (0.75), 1.0 when all samples are used for both training
- and testing.
- :param accel:
- Acceleration level: constraint: 0 < accel <= 10. Higher means faster
- and less accurate, 0 means default(1).
- :param dict_id:
- Integer dictionary ID for the produced dictionary. Default is 0, which uses
- a random value.
- :param steps:
- Number of steps through ``k`` values to perform when trying parameter
- variations.
- :param threads:
- Number of threads to use when trying parameter variations. Default is 0,
- which means to use a single thread. A negative value can be specified to
- use as many threads as there are detected logical CPUs.
- :param level:
- Integer target compression level when trying parameter variations.
- :param notifications:
- Controls writing of informational messages to ``stderr``. ``0`` (the
- default) means to write nothing. ``1`` writes errors. ``2`` writes
- progression info. ``3`` writes more details. And ``4`` writes all info.
- """
- if not isinstance(samples, list):
- raise TypeError("samples must be a list")
- if threads < 0:
- threads = _cpu_count()
- if not steps and not threads:
- d = d or 8
- steps = steps or 4
- level = level or 3
- total_size = sum(map(len, samples))
- samples_buffer = new_nonzero("char[]", total_size)
- sample_sizes = new_nonzero("size_t[]", len(samples))
- offset = 0
- for i, sample in enumerate(samples):
- if not isinstance(sample, bytes):
- raise ValueError("samples must be bytes")
- sample_len = len(sample)
- ffi.memmove(samples_buffer + offset, sample, sample_len)
- offset += sample_len
- sample_sizes[i] = sample_len
- dict_data = new_nonzero("char[]", dict_size)
- dparams = ffi.new("ZDICT_fastCover_params_t *")[0]
- dparams.k = k
- dparams.d = d
- dparams.f = f
- dparams.steps = steps
- dparams.nbThreads = threads
- dparams.splitPoint = split_point
- dparams.accel = accel
- dparams.zParams.notificationLevel = notifications
- dparams.zParams.dictID = dict_id
- dparams.zParams.compressionLevel = level
- zresult = lib.ZDICT_optimizeTrainFromBuffer_fastCover(
- ffi.addressof(dict_data),
- dict_size,
- ffi.addressof(samples_buffer),
- ffi.addressof(sample_sizes, 0),
- len(samples),
- ffi.addressof(dparams),
- )
- if lib.ZDICT_isError(zresult):
- msg = ffi.string(lib.ZDICT_getErrorName(zresult)).decode("utf-8")
- raise ZstdError("cannot train dict: %s" % msg)
- return ZstdCompressionDict(
- ffi.buffer(dict_data, zresult)[:],
- dict_type=DICT_TYPE_FULLDICT,
- k=dparams.k,
- d=dparams.d,
- )
- class ZstdDecompressionObj(object):
- """A standard library API compatible decompressor.
- This type implements a compressor that conforms to the API by other
- decompressors in Python's standard library. e.g. ``zlib.decompressobj``
- or ``bz2.BZ2Decompressor``. This allows callers to use zstd compression
- while conforming to a similar API.
- Compressed data chunks are fed into ``decompress(data)`` and
- uncompressed output (or an empty bytes) is returned. Output from
- subsequent calls needs to be concatenated to reassemble the full
- decompressed byte sequence.
- If ``read_across_frames=False``, each instance is single use: once an
- input frame is decoded, ``decompress()`` will raise an exception. If
- ``read_across_frames=True``, instances can decode multiple frames.
- >>> dctx = zstandard.ZstdDecompressor()
- >>> dobj = dctx.decompressobj()
- >>> data = dobj.decompress(compressed_chunk_0)
- >>> data = dobj.decompress(compressed_chunk_1)
- By default, calls to ``decompress()`` write output data in chunks of size
- ``DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE``. These chunks are concatenated
- before being returned to the caller. It is possible to define the size of
- these temporary chunks by passing ``write_size`` to ``decompressobj()``:
- >>> dctx = zstandard.ZstdDecompressor()
- >>> dobj = dctx.decompressobj(write_size=1048576)
- .. note::
- Because calls to ``decompress()`` may need to perform multiple
- memory (re)allocations, this streaming decompression API isn't as
- efficient as other APIs.
- """
- def __init__(self, decompressor, write_size, read_across_frames):
- self._decompressor = decompressor
- self._write_size = write_size
- self._finished = False
- self._read_across_frames = read_across_frames
- self._unused_input = b""
- def decompress(self, data):
- """Send compressed data to the decompressor and obtain decompressed data.
- :param data:
- Data to feed into the decompressor.
- :return:
- Decompressed bytes.
- """
- if self._finished:
- raise ZstdError("cannot use a decompressobj multiple times")
- in_buffer = ffi.new("ZSTD_inBuffer *")
- out_buffer = ffi.new("ZSTD_outBuffer *")
- data_buffer = ffi.from_buffer(data)
- if len(data_buffer) == 0:
- return b""
- in_buffer.src = data_buffer
- in_buffer.size = len(data_buffer)
- in_buffer.pos = 0
- dst_buffer = ffi.new("char[]", self._write_size)
- out_buffer.dst = dst_buffer
- out_buffer.size = len(dst_buffer)
- out_buffer.pos = 0
- chunks = []
- while True:
- zresult = lib.ZSTD_decompressStream(
- self._decompressor._dctx, out_buffer, in_buffer
- )
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "zstd decompressor error: %s" % _zstd_error(zresult)
- )
- # Always record any output from decompressor.
- if out_buffer.pos:
- chunks.append(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
- # 0 is only seen when a frame is fully decoded *and* fully flushed.
- # Behavior depends on whether we're in single or multiple frame
- # mode.
- if zresult == 0 and not self._read_across_frames:
- # Mark the instance as done and make any unconsumed input available
- # for retrieval.
- self._finished = True
- self._decompressor = None
- self._unused_input = data[in_buffer.pos : in_buffer.size]
- break
- elif zresult == 0 and self._read_across_frames:
- # We're at the end of a fully flushed frame and we can read more.
- # Try to read more if there's any more input.
- if in_buffer.pos == in_buffer.size:
- break
- else:
- out_buffer.pos = 0
- # We're not at the end of the frame *or* we're not fully flushed.
- # The decompressor will write out all the bytes it can to the output
- # buffer. So if the output buffer is partially filled and the input
- # is exhausted, there's nothing more to write. So we've done all we
- # can.
- elif (
- in_buffer.pos == in_buffer.size
- and out_buffer.pos < out_buffer.size
- ):
- break
- else:
- out_buffer.pos = 0
- return b"".join(chunks)
- def flush(self, length=0):
- """Effectively a no-op.
- Implemented for compatibility with the standard library APIs.
- Safe to call at any time.
- :return:
- Empty bytes.
- """
- return b""
- @property
- def unused_data(self):
- """Bytes past the end of compressed data.
- If ``decompress()`` is fed additional data beyond the end of a zstd
- frame, this value will be non-empty once ``decompress()`` fully decodes
- the input frame.
- """
- return self._unused_input
- @property
- def unconsumed_tail(self):
- """Data that has not yet been fed into the decompressor."""
- return b""
- @property
- def eof(self):
- """Whether the end of the compressed data stream has been reached."""
- return self._finished
- class ZstdDecompressionReader(object):
- """Read only decompressor that pull uncompressed data from another stream.
- This type provides a read-only stream interface for performing transparent
- decompression from another stream or data source. It conforms to the
- ``io.RawIOBase`` interface. Only methods relevant to reading are
- implemented.
- >>> with open(path, 'rb') as fh:
- >>> dctx = zstandard.ZstdDecompressor()
- >>> reader = dctx.stream_reader(fh)
- >>> while True:
- ... chunk = reader.read(16384)
- ... if not chunk:
- ... break
- ... # Do something with decompressed chunk.
- The stream can also be used as a context manager:
- >>> with open(path, 'rb') as fh:
- ... dctx = zstandard.ZstdDecompressor()
- ... with dctx.stream_reader(fh) as reader:
- ... ...
- When used as a context manager, the stream is closed and the underlying
- resources are released when the context manager exits. Future operations
- against the stream will fail.
- The ``source`` argument to ``stream_reader()`` can be any object with a
- ``read(size)`` method or any object implementing the *buffer protocol*.
- If the ``source`` is a stream, you can specify how large ``read()`` requests
- to that stream should be via the ``read_size`` argument. It defaults to
- ``zstandard.DECOMPRESSION_RECOMMENDED_INPUT_SIZE``.:
- >>> with open(path, 'rb') as fh:
- ... dctx = zstandard.ZstdDecompressor()
- ... # Will perform fh.read(8192) when obtaining data for the decompressor.
- ... with dctx.stream_reader(fh, read_size=8192) as reader:
- ... ...
- Instances are *partially* seekable. Absolute and relative positions
- (``SEEK_SET`` and ``SEEK_CUR``) forward of the current position are
- allowed. Offsets behind the current read position and offsets relative
- to the end of stream are not allowed and will raise ``ValueError``
- if attempted.
- ``tell()`` returns the number of decompressed bytes read so far.
- Not all I/O methods are implemented. Notably missing is support for
- ``readline()``, ``readlines()``, and linewise iteration support. This is
- because streams operate on binary data - not text data. If you want to
- convert decompressed output to text, you can chain an ``io.TextIOWrapper``
- to the stream:
- >>> with open(path, 'rb') as fh:
- ... dctx = zstandard.ZstdDecompressor()
- ... stream_reader = dctx.stream_reader(fh)
- ... text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8')
- ... for line in text_stream:
- ... ...
- """
- def __init__(
- self,
- decompressor,
- source,
- read_size,
- read_across_frames,
- closefd=True,
- ):
- self._decompressor = decompressor
- self._source = source
- self._read_size = read_size
- self._read_across_frames = bool(read_across_frames)
- self._closefd = bool(closefd)
- self._entered = False
- self._closed = False
- self._bytes_decompressed = 0
- self._finished_input = False
- self._finished_output = False
- self._in_buffer = ffi.new("ZSTD_inBuffer *")
- # Holds a ref to self._in_buffer.src.
- self._source_buffer = None
- def __enter__(self):
- if self._entered:
- raise ValueError("cannot __enter__ multiple times")
- if self._closed:
- raise ValueError("stream is closed")
- self._entered = True
- return self
- def __exit__(self, exc_type, exc_value, exc_tb):
- self._entered = False
- self._decompressor = None
- self.close()
- self._source = None
- return False
- def readable(self):
- return True
- def writable(self):
- return False
- def seekable(self):
- return False
- def readline(self, size=-1):
- raise io.UnsupportedOperation()
- def readlines(self, hint=-1):
- raise io.UnsupportedOperation()
- def write(self, data):
- raise io.UnsupportedOperation()
- def writelines(self, lines):
- raise io.UnsupportedOperation()
- def isatty(self):
- return False
- def flush(self):
- return None
- def close(self):
- if self._closed:
- return None
- self._closed = True
- f = getattr(self._source, "close", None)
- if self._closefd and f:
- f()
- @property
- def closed(self):
- return self._closed
- def tell(self):
- return self._bytes_decompressed
- def readall(self):
- chunks = []
- while True:
- chunk = self.read(1048576)
- if not chunk:
- break
- chunks.append(chunk)
- return b"".join(chunks)
- def __iter__(self):
- raise io.UnsupportedOperation()
- def __next__(self):
- raise io.UnsupportedOperation()
- next = __next__
- def _read_input(self):
- # We have data left over in the input buffer. Use it.
- if self._in_buffer.pos < self._in_buffer.size:
- return
- # All input data exhausted. Nothing to do.
- if self._finished_input:
- return
- # Else populate the input buffer from our source.
- if hasattr(self._source, "read"):
- data = self._source.read(self._read_size)
- if not data:
- self._finished_input = True
- return
- self._source_buffer = ffi.from_buffer(data)
- self._in_buffer.src = self._source_buffer
- self._in_buffer.size = len(self._source_buffer)
- self._in_buffer.pos = 0
- else:
- self._source_buffer = ffi.from_buffer(self._source)
- self._in_buffer.src = self._source_buffer
- self._in_buffer.size = len(self._source_buffer)
- self._in_buffer.pos = 0
- def _decompress_into_buffer(self, out_buffer):
- """Decompress available input into an output buffer.
- Returns True if data in output buffer should be emitted.
- """
- zresult = lib.ZSTD_decompressStream(
- self._decompressor._dctx, out_buffer, self._in_buffer
- )
- if self._in_buffer.pos == self._in_buffer.size:
- self._in_buffer.src = ffi.NULL
- self._in_buffer.pos = 0
- self._in_buffer.size = 0
- self._source_buffer = None
- if not hasattr(self._source, "read"):
- self._finished_input = True
- if lib.ZSTD_isError(zresult):
- raise ZstdError("zstd decompress error: %s" % _zstd_error(zresult))
- # Emit data if there is data AND either:
- # a) output buffer is full (read amount is satisfied)
- # b) we're at end of a frame and not in frame spanning mode
- return out_buffer.pos and (
- out_buffer.pos == out_buffer.size
- or zresult == 0
- and not self._read_across_frames
- )
- def read(self, size=-1):
- if self._closed:
- raise ValueError("stream is closed")
- if size < -1:
- raise ValueError("cannot read negative amounts less than -1")
- if size == -1:
- # This is recursive. But it gets the job done.
- return self.readall()
- if self._finished_output or size == 0:
- return b""
- # We /could/ call into readinto() here. But that introduces more
- # overhead.
- dst_buffer = ffi.new("char[]", size)
- out_buffer = ffi.new("ZSTD_outBuffer *")
- out_buffer.dst = dst_buffer
- out_buffer.size = size
- out_buffer.pos = 0
- self._read_input()
- if self._decompress_into_buffer(out_buffer):
- self._bytes_decompressed += out_buffer.pos
- return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
- while not self._finished_input:
- self._read_input()
- if self._decompress_into_buffer(out_buffer):
- self._bytes_decompressed += out_buffer.pos
- return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
- self._bytes_decompressed += out_buffer.pos
- return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
- def readinto(self, b):
- if self._closed:
- raise ValueError("stream is closed")
- if self._finished_output:
- return 0
- # TODO use writable=True once we require CFFI >= 1.12.
- dest_buffer = ffi.from_buffer(b)
- ffi.memmove(b, b"", 0)
- out_buffer = ffi.new("ZSTD_outBuffer *")
- out_buffer.dst = dest_buffer
- out_buffer.size = len(dest_buffer)
- out_buffer.pos = 0
- self._read_input()
- if self._decompress_into_buffer(out_buffer):
- self._bytes_decompressed += out_buffer.pos
- return out_buffer.pos
- while not self._finished_input:
- self._read_input()
- if self._decompress_into_buffer(out_buffer):
- self._bytes_decompressed += out_buffer.pos
- return out_buffer.pos
- self._bytes_decompressed += out_buffer.pos
- return out_buffer.pos
- def read1(self, size=-1):
- if self._closed:
- raise ValueError("stream is closed")
- if size < -1:
- raise ValueError("cannot read negative amounts less than -1")
- if self._finished_output or size == 0:
- return b""
- # -1 returns arbitrary number of bytes.
- if size == -1:
- size = DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE
- dst_buffer = ffi.new("char[]", size)
- out_buffer = ffi.new("ZSTD_outBuffer *")
- out_buffer.dst = dst_buffer
- out_buffer.size = size
- out_buffer.pos = 0
- # read1() dictates that we can perform at most 1 call to underlying
- # stream to get input. However, we can't satisfy this restriction with
- # decompression because not all input generates output. So we allow
- # multiple read(). But unlike read(), we stop once we have any output.
- while not self._finished_input:
- self._read_input()
- self._decompress_into_buffer(out_buffer)
- if out_buffer.pos:
- break
- self._bytes_decompressed += out_buffer.pos
- return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
- def readinto1(self, b):
- if self._closed:
- raise ValueError("stream is closed")
- if self._finished_output:
- return 0
- # TODO use writable=True once we require CFFI >= 1.12.
- dest_buffer = ffi.from_buffer(b)
- ffi.memmove(b, b"", 0)
- out_buffer = ffi.new("ZSTD_outBuffer *")
- out_buffer.dst = dest_buffer
- out_buffer.size = len(dest_buffer)
- out_buffer.pos = 0
- while not self._finished_input and not self._finished_output:
- self._read_input()
- self._decompress_into_buffer(out_buffer)
- if out_buffer.pos:
- break
- self._bytes_decompressed += out_buffer.pos
- return out_buffer.pos
- def seek(self, pos, whence=os.SEEK_SET):
- if self._closed:
- raise ValueError("stream is closed")
- read_amount = 0
- if whence == os.SEEK_SET:
- if pos < 0:
- raise OSError("cannot seek to negative position with SEEK_SET")
- if pos < self._bytes_decompressed:
- raise OSError("cannot seek zstd decompression stream backwards")
- read_amount = pos - self._bytes_decompressed
- elif whence == os.SEEK_CUR:
- if pos < 0:
- raise OSError("cannot seek zstd decompression stream backwards")
- read_amount = pos
- elif whence == os.SEEK_END:
- raise OSError(
- "zstd decompression streams cannot be seeked with SEEK_END"
- )
- while read_amount:
- result = self.read(
- min(read_amount, DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE)
- )
- if not result:
- break
- read_amount -= len(result)
- return self._bytes_decompressed
- class ZstdDecompressionWriter(object):
- """
- Write-only stream wrapper that performs decompression.
- This type provides a writable stream that performs decompression and writes
- decompressed data to another stream.
- This type implements the ``io.RawIOBase`` interface. Only methods that
- involve writing will do useful things.
- Behavior is similar to :py:meth:`ZstdCompressor.stream_writer`: compressed
- data is sent to the decompressor by calling ``write(data)`` and decompressed
- output is written to the inner stream by calling its ``write(data)``
- method:
- >>> dctx = zstandard.ZstdDecompressor()
- >>> decompressor = dctx.stream_writer(fh)
- >>> # Will call fh.write() with uncompressed data.
- >>> decompressor.write(compressed_data)
- Instances can be used as context managers. However, context managers add no
- extra special behavior other than automatically calling ``close()`` when
- they exit.
- Calling ``close()`` will mark the stream as closed and subsequent I/O
- operations will raise ``ValueError`` (per the documented behavior of
- ``io.RawIOBase``). ``close()`` will also call ``close()`` on the
- underlying stream if such a method exists and the instance was created with
- ``closefd=True``.
- The size of chunks to ``write()`` to the destination can be specified:
- >>> dctx = zstandard.ZstdDecompressor()
- >>> with dctx.stream_writer(fh, write_size=16384) as decompressor:
- >>> pass
- You can see how much memory is being used by the decompressor:
- >>> dctx = zstandard.ZstdDecompressor()
- >>> with dctx.stream_writer(fh) as decompressor:
- >>> byte_size = decompressor.memory_size()
- ``stream_writer()`` accepts a ``write_return_read`` boolean argument to control
- the return value of ``write()``. When ``True`` (the default)``, ``write()``
- returns the number of bytes that were read from the input. When ``False``,
- ``write()`` returns the number of bytes that were ``write()`` to the inner
- stream.
- """
- def __init__(
- self,
- decompressor,
- writer,
- write_size,
- write_return_read,
- closefd=True,
- ):
- decompressor._ensure_dctx()
- self._decompressor = decompressor
- self._writer = writer
- self._write_size = write_size
- self._write_return_read = bool(write_return_read)
- self._closefd = bool(closefd)
- self._entered = False
- self._closing = False
- self._closed = False
- def __enter__(self):
- if self._closed:
- raise ValueError("stream is closed")
- if self._entered:
- raise ZstdError("cannot __enter__ multiple times")
- self._entered = True
- return self
- def __exit__(self, exc_type, exc_value, exc_tb):
- self._entered = False
- self.close()
- return False
- def __iter__(self):
- raise io.UnsupportedOperation()
- def __next__(self):
- raise io.UnsupportedOperation()
- def memory_size(self):
- return lib.ZSTD_sizeof_DCtx(self._decompressor._dctx)
- def close(self):
- if self._closed:
- return
- try:
- self._closing = True
- self.flush()
- finally:
- self._closing = False
- self._closed = True
- f = getattr(self._writer, "close", None)
- if self._closefd and f:
- f()
- @property
- def closed(self):
- return self._closed
- def fileno(self):
- f = getattr(self._writer, "fileno", None)
- if f:
- return f()
- else:
- raise OSError("fileno not available on underlying writer")
- def flush(self):
- if self._closed:
- raise ValueError("stream is closed")
- f = getattr(self._writer, "flush", None)
- if f and not self._closing:
- return f()
- def isatty(self):
- return False
- def readable(self):
- return False
- def readline(self, size=-1):
- raise io.UnsupportedOperation()
- def readlines(self, hint=-1):
- raise io.UnsupportedOperation()
- def seek(self, offset, whence=None):
- raise io.UnsupportedOperation()
- def seekable(self):
- return False
- def tell(self):
- raise io.UnsupportedOperation()
- def truncate(self, size=None):
- raise io.UnsupportedOperation()
- def writable(self):
- return True
- def writelines(self, lines):
- raise io.UnsupportedOperation()
- def read(self, size=-1):
- raise io.UnsupportedOperation()
- def readall(self):
- raise io.UnsupportedOperation()
- def readinto(self, b):
- raise io.UnsupportedOperation()
- def write(self, data):
- if self._closed:
- raise ValueError("stream is closed")
- total_write = 0
- in_buffer = ffi.new("ZSTD_inBuffer *")
- out_buffer = ffi.new("ZSTD_outBuffer *")
- data_buffer = ffi.from_buffer(data)
- in_buffer.src = data_buffer
- in_buffer.size = len(data_buffer)
- in_buffer.pos = 0
- dst_buffer = ffi.new("char[]", self._write_size)
- out_buffer.dst = dst_buffer
- out_buffer.size = len(dst_buffer)
- out_buffer.pos = 0
- dctx = self._decompressor._dctx
- while in_buffer.pos < in_buffer.size:
- zresult = lib.ZSTD_decompressStream(dctx, out_buffer, in_buffer)
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "zstd decompress error: %s" % _zstd_error(zresult)
- )
- if out_buffer.pos:
- self._writer.write(
- ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
- )
- total_write += out_buffer.pos
- out_buffer.pos = 0
- if self._write_return_read:
- return in_buffer.pos
- else:
- return total_write
- class ZstdDecompressor(object):
- """
- Context for performing zstandard decompression.
- Each instance is essentially a wrapper around a ``ZSTD_DCtx`` from zstd's
- C API.
- An instance can compress data various ways. Instances can be used multiple
- times.
- The interface of this class is very similar to
- :py:class:`zstandard.ZstdCompressor` (by design).
- Assume that each ``ZstdDecompressor`` instance can only handle a single
- logical compression operation at the same time. i.e. if you call a method
- like ``decompressobj()`` to obtain multiple objects derived from the same
- ``ZstdDecompressor`` instance and attempt to use them simultaneously, errors
- will likely occur.
- If you need to perform multiple logical decompression operations and you
- can't guarantee those operations are temporally non-overlapping, you need
- to obtain multiple ``ZstdDecompressor`` instances.
- Unless specified otherwise, assume that no two methods of
- ``ZstdDecompressor`` instances can be called from multiple Python
- threads simultaneously. In other words, assume instances are not thread safe
- unless stated otherwise.
- :param dict_data:
- Compression dictionary to use.
- :param max_window_size:
- Sets an upper limit on the window size for decompression operations in
- kibibytes. This setting can be used to prevent large memory allocations
- for inputs using large compression windows.
- :param format:
- Set the format of data for the decoder.
- By default this is ``zstandard.FORMAT_ZSTD1``. It can be set to
- ``zstandard.FORMAT_ZSTD1_MAGICLESS`` to allow decoding frames without
- the 4 byte magic header. Not all decompression APIs support this mode.
- """
- def __init__(self, dict_data=None, max_window_size=0, format=FORMAT_ZSTD1):
- self._dict_data = dict_data
- self._max_window_size = max_window_size
- self._format = format
- dctx = lib.ZSTD_createDCtx()
- if dctx == ffi.NULL:
- raise MemoryError()
- self._dctx = dctx
- # Defer setting up garbage collection until full state is loaded so
- # the memory size is more accurate.
- try:
- self._ensure_dctx()
- finally:
- self._dctx = ffi.gc(
- dctx, lib.ZSTD_freeDCtx, size=lib.ZSTD_sizeof_DCtx(dctx)
- )
- def memory_size(self):
- """Size of decompression context, in bytes.
- >>> dctx = zstandard.ZstdDecompressor()
- >>> size = dctx.memory_size()
- """
- return lib.ZSTD_sizeof_DCtx(self._dctx)
- def decompress(
- self,
- data,
- max_output_size=0,
- read_across_frames=False,
- allow_extra_data=True,
- ):
- """
- Decompress data in a single operation.
- This method will decompress the input data in a single operation and
- return the decompressed data.
- The input bytes are expected to contain at least 1 full Zstandard frame
- (something compressed with :py:meth:`ZstdCompressor.compress` or
- similar). If the input does not contain a full frame, an exception will
- be raised.
- ``read_across_frames`` controls whether to read multiple zstandard
- frames in the input. When False, decompression stops after reading the
- first frame. This feature is not yet implemented but the argument is
- provided for forward API compatibility when the default is changed to
- True in a future release. For now, if you need to decompress multiple
- frames, use an API like :py:meth:`ZstdCompressor.stream_reader` with
- ``read_across_frames=True``.
- ``allow_extra_data`` controls how to handle extra input data after a
- fully decoded frame. If False, any extra data (which could be a valid
- zstd frame) will result in ``ZstdError`` being raised. If True, extra
- data is silently ignored. The default will likely change to False in a
- future release when ``read_across_frames`` defaults to True.
- If the input contains extra data after a full frame, that extra input
- data is silently ignored. This behavior is undesirable in many scenarios
- and will likely be changed or controllable in a future release (see
- #181).
- If the frame header of the compressed data does not contain the content
- size, ``max_output_size`` must be specified or ``ZstdError`` will be
- raised. An allocation of size ``max_output_size`` will be performed and an
- attempt will be made to perform decompression into that buffer. If the
- buffer is too small or cannot be allocated, ``ZstdError`` will be
- raised. The buffer will be resized if it is too large.
- Uncompressed data could be much larger than compressed data. As a result,
- calling this function could result in a very large memory allocation
- being performed to hold the uncompressed data. This could potentially
- result in ``MemoryError`` or system memory swapping. If you don't need
- the full output data in a single contiguous array in memory, consider
- using streaming decompression for more resilient memory behavior.
- Usage:
- >>> dctx = zstandard.ZstdDecompressor()
- >>> decompressed = dctx.decompress(data)
- If the compressed data doesn't have its content size embedded within it,
- decompression can be attempted by specifying the ``max_output_size``
- argument:
- >>> dctx = zstandard.ZstdDecompressor()
- >>> uncompressed = dctx.decompress(data, max_output_size=1048576)
- Ideally, ``max_output_size`` will be identical to the decompressed
- output size.
- .. important::
- If the exact size of decompressed data is unknown (not passed in
- explicitly and not stored in the zstd frame), for performance
- reasons it is encouraged to use a streaming API.
- :param data:
- Compressed data to decompress.
- :param max_output_size:
- Integer max size of response.
- If ``0``, there is no limit and we can attempt to allocate an output
- buffer of infinite size.
- :return:
- ``bytes`` representing decompressed output.
- """
- if read_across_frames:
- raise ZstdError(
- "ZstdDecompressor.read_across_frames=True is not yet implemented"
- )
- self._ensure_dctx()
- data_buffer = ffi.from_buffer(data)
- params = ffi.new("ZSTD_FrameHeader *")
- zresult = lib.ZSTD_getFrameHeader_advanced(
- params, data_buffer, len(data_buffer), self._format
- )
- if zresult != 0:
- raise ZstdError("error determining content size from frame header")
- output_size = params.frameContentSize
- if output_size == 0:
- return b""
- elif output_size == lib.ZSTD_CONTENTSIZE_UNKNOWN:
- if not max_output_size:
- raise ZstdError(
- "could not determine content size in frame header"
- )
- result_buffer = ffi.new("char[]", max_output_size)
- result_size = max_output_size
- output_size = 0
- else:
- result_buffer = ffi.new("char[]", output_size)
- result_size = output_size
- out_buffer = ffi.new("ZSTD_outBuffer *")
- out_buffer.dst = result_buffer
- out_buffer.size = result_size
- out_buffer.pos = 0
- in_buffer = ffi.new("ZSTD_inBuffer *")
- in_buffer.src = data_buffer
- in_buffer.size = len(data_buffer)
- in_buffer.pos = 0
- zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer)
- if lib.ZSTD_isError(zresult):
- raise ZstdError("decompression error: %s" % _zstd_error(zresult))
- elif zresult:
- raise ZstdError(
- "decompression error: did not decompress full frame"
- )
- elif output_size and out_buffer.pos != output_size:
- raise ZstdError(
- "decompression error: decompressed %d bytes; expected %d"
- % (zresult, output_size)
- )
- elif not allow_extra_data and in_buffer.pos < in_buffer.size:
- count = in_buffer.size - in_buffer.pos
- raise ZstdError(
- "compressed input contains %d bytes of unused data, which is disallowed"
- % count
- )
- return ffi.buffer(result_buffer, out_buffer.pos)[:]
- def stream_reader(
- self,
- source,
- read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE,
- read_across_frames=False,
- closefd=True,
- ):
- """
- Read-only stream wrapper that performs decompression.
- This method obtains an object that conforms to the ``io.RawIOBase``
- interface and performs transparent decompression via ``read()``
- operations. Source data is obtained by calling ``read()`` on a
- source stream or object implementing the buffer protocol.
- See :py:class:`zstandard.ZstdDecompressionReader` for more documentation
- and usage examples.
- :param source:
- Source of compressed data to decompress. Can be any object
- with a ``read(size)`` method or that conforms to the buffer protocol.
- :param read_size:
- Integer number of bytes to read from the source and feed into the
- compressor at a time.
- :param read_across_frames:
- Whether to read data across multiple zstd frames. If False,
- decompression is stopped at frame boundaries.
- :param closefd:
- Whether to close the source stream when this instance is closed.
- :return:
- :py:class:`zstandard.ZstdDecompressionReader`.
- """
- self._ensure_dctx()
- return ZstdDecompressionReader(
- self, source, read_size, read_across_frames, closefd=closefd
- )
- def decompressobj(
- self,
- write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE,
- read_across_frames=False,
- ):
- """Obtain a standard library compatible incremental decompressor.
- See :py:class:`ZstdDecompressionObj` for more documentation
- and usage examples.
- :param write_size: size of internal output buffer to collect decompressed
- chunks in.
- :param read_across_frames: whether to read across multiple zstd frames.
- If False, reading stops after 1 frame and subsequent decompress
- attempts will raise an exception.
- :return:
- :py:class:`zstandard.ZstdDecompressionObj`
- """
- if write_size < 1:
- raise ValueError("write_size must be positive")
- self._ensure_dctx()
- return ZstdDecompressionObj(
- self, write_size=write_size, read_across_frames=read_across_frames
- )
- def read_to_iter(
- self,
- reader,
- read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE,
- write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE,
- skip_bytes=0,
- ):
- """Read compressed data to an iterator of uncompressed chunks.
- This method will read data from ``reader``, feed it to a decompressor,
- and emit ``bytes`` chunks representing the decompressed result.
- >>> dctx = zstandard.ZstdDecompressor()
- >>> for chunk in dctx.read_to_iter(fh):
- ... # Do something with original data.
- ``read_to_iter()`` accepts an object with a ``read(size)`` method that
- will return compressed bytes or an object conforming to the buffer
- protocol.
- ``read_to_iter()`` returns an iterator whose elements are chunks of the
- decompressed data.
- The size of requested ``read()`` from the source can be specified:
- >>> dctx = zstandard.ZstdDecompressor()
- >>> for chunk in dctx.read_to_iter(fh, read_size=16384):
- ... pass
- It is also possible to skip leading bytes in the input data:
- >>> dctx = zstandard.ZstdDecompressor()
- >>> for chunk in dctx.read_to_iter(fh, skip_bytes=1):
- ... pass
- .. tip::
- Skipping leading bytes is useful if the source data contains extra
- *header* data. Traditionally, you would need to create a slice or
- ``memoryview`` of the data you want to decompress. This would create
- overhead. It is more efficient to pass the offset into this API.
- Similarly to :py:meth:`ZstdCompressor.read_to_iter`, the consumer of the
- iterator controls when data is decompressed. If the iterator isn't consumed,
- decompression is put on hold.
- When ``read_to_iter()`` is passed an object conforming to the buffer protocol,
- the behavior may seem similar to what occurs when the simple decompression
- API is used. However, this API works when the decompressed size is unknown.
- Furthermore, if feeding large inputs, the decompressor will work in chunks
- instead of performing a single operation.
- :param reader:
- Source of compressed data. Can be any object with a
- ``read(size)`` method or any object conforming to the buffer
- protocol.
- :param read_size:
- Integer size of data chunks to read from ``reader`` and feed into
- the decompressor.
- :param write_size:
- Integer size of data chunks to emit from iterator.
- :param skip_bytes:
- Integer number of bytes to skip over before sending data into
- the decompressor.
- :return:
- Iterator of ``bytes`` representing uncompressed data.
- """
- if skip_bytes >= read_size:
- raise ValueError("skip_bytes must be smaller than read_size")
- if hasattr(reader, "read"):
- have_read = True
- elif hasattr(reader, "__getitem__"):
- have_read = False
- buffer_offset = 0
- size = len(reader)
- else:
- raise ValueError(
- "must pass an object with a read() method or "
- "conforms to buffer protocol"
- )
- if skip_bytes:
- if have_read:
- reader.read(skip_bytes)
- else:
- if skip_bytes > size:
- raise ValueError("skip_bytes larger than first input chunk")
- buffer_offset = skip_bytes
- self._ensure_dctx()
- in_buffer = ffi.new("ZSTD_inBuffer *")
- out_buffer = ffi.new("ZSTD_outBuffer *")
- dst_buffer = ffi.new("char[]", write_size)
- out_buffer.dst = dst_buffer
- out_buffer.size = len(dst_buffer)
- out_buffer.pos = 0
- while True:
- assert out_buffer.pos == 0
- if have_read:
- read_result = reader.read(read_size)
- else:
- remaining = size - buffer_offset
- slice_size = min(remaining, read_size)
- read_result = reader[buffer_offset : buffer_offset + slice_size]
- buffer_offset += slice_size
- # No new input. Break out of read loop.
- if not read_result:
- break
- # Feed all read data into decompressor and emit output until
- # exhausted.
- read_buffer = ffi.from_buffer(read_result)
- in_buffer.src = read_buffer
- in_buffer.size = len(read_buffer)
- in_buffer.pos = 0
- while in_buffer.pos < in_buffer.size:
- assert out_buffer.pos == 0
- zresult = lib.ZSTD_decompressStream(
- self._dctx, out_buffer, in_buffer
- )
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "zstd decompress error: %s" % _zstd_error(zresult)
- )
- if out_buffer.pos:
- data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
- out_buffer.pos = 0
- yield data
- if zresult == 0:
- return
- # Repeat loop to collect more input data.
- continue
- # If we get here, input is exhausted.
- def stream_writer(
- self,
- writer,
- write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE,
- write_return_read=True,
- closefd=True,
- ):
- """
- Push-based stream wrapper that performs decompression.
- This method constructs a stream wrapper that conforms to the
- ``io.RawIOBase`` interface and performs transparent decompression
- when writing to a wrapper stream.
- See :py:class:`zstandard.ZstdDecompressionWriter` for more documentation
- and usage examples.
- :param writer:
- Destination for decompressed output. Can be any object with a
- ``write(data)``.
- :param write_size:
- Integer size of chunks to ``write()`` to ``writer``.
- :param write_return_read:
- Whether ``write()`` should return the number of bytes of input
- consumed. If False, ``write()`` returns the number of bytes sent
- to the inner stream.
- :param closefd:
- Whether to ``close()`` the inner stream when this stream is closed.
- :return:
- :py:class:`zstandard.ZstdDecompressionWriter`
- """
- if not hasattr(writer, "write"):
- raise ValueError("must pass an object with a write() method")
- return ZstdDecompressionWriter(
- self,
- writer,
- write_size,
- write_return_read,
- closefd=closefd,
- )
- def copy_stream(
- self,
- ifh,
- ofh,
- read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE,
- write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE,
- ):
- """
- Copy data between streams, decompressing in the process.
- Compressed data will be read from ``ifh``, decompressed, and written
- to ``ofh``.
- >>> dctx = zstandard.ZstdDecompressor()
- >>> dctx.copy_stream(ifh, ofh)
- e.g. to decompress a file to another file:
- >>> dctx = zstandard.ZstdDecompressor()
- >>> with open(input_path, 'rb') as ifh, open(output_path, 'wb') as ofh:
- ... dctx.copy_stream(ifh, ofh)
- The size of chunks being ``read()`` and ``write()`` from and to the
- streams can be specified:
- >>> dctx = zstandard.ZstdDecompressor()
- >>> dctx.copy_stream(ifh, ofh, read_size=8192, write_size=16384)
- :param ifh:
- Source stream to read compressed data from.
- Must have a ``read()`` method.
- :param ofh:
- Destination stream to write uncompressed data to.
- Must have a ``write()`` method.
- :param read_size:
- The number of bytes to ``read()`` from the source in a single
- operation.
- :param write_size:
- The number of bytes to ``write()`` to the destination in a single
- operation.
- :return:
- 2-tuple of integers representing the number of bytes read and
- written, respectively.
- """
- if not hasattr(ifh, "read"):
- raise ValueError("first argument must have a read() method")
- if not hasattr(ofh, "write"):
- raise ValueError("second argument must have a write() method")
- self._ensure_dctx()
- in_buffer = ffi.new("ZSTD_inBuffer *")
- out_buffer = ffi.new("ZSTD_outBuffer *")
- dst_buffer = ffi.new("char[]", write_size)
- out_buffer.dst = dst_buffer
- out_buffer.size = write_size
- out_buffer.pos = 0
- total_read, total_write = 0, 0
- # Read all available input.
- while True:
- data = ifh.read(read_size)
- if not data:
- break
- data_buffer = ffi.from_buffer(data)
- total_read += len(data_buffer)
- in_buffer.src = data_buffer
- in_buffer.size = len(data_buffer)
- in_buffer.pos = 0
- # Flush all read data to output.
- while in_buffer.pos < in_buffer.size:
- zresult = lib.ZSTD_decompressStream(
- self._dctx, out_buffer, in_buffer
- )
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "zstd decompressor error: %s" % _zstd_error(zresult)
- )
- if out_buffer.pos:
- ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
- total_write += out_buffer.pos
- out_buffer.pos = 0
- # Continue loop to keep reading.
- return total_read, total_write
- def decompress_content_dict_chain(self, frames):
- """
- Decompress a series of frames using the content dictionary chaining technique.
- Such a list of frames is produced by compressing discrete inputs where
- each non-initial input is compressed with a *prefix* dictionary consisting
- of the content of the previous input.
- For example, say you have the following inputs:
- >>> inputs = [b"input 1", b"input 2", b"input 3"]
- The zstd frame chain consists of:
- 1. ``b"input 1"`` compressed in standalone/discrete mode
- 2. ``b"input 2"`` compressed using ``b"input 1"`` as a *prefix* dictionary
- 3. ``b"input 3"`` compressed using ``b"input 2"`` as a *prefix* dictionary
- Each zstd frame **must** have the content size written.
- The following Python code can be used to produce a *prefix dictionary chain*:
- >>> def make_chain(inputs):
- ... frames = []
- ...
- ... # First frame is compressed in standalone/discrete mode.
- ... zctx = zstandard.ZstdCompressor()
- ... frames.append(zctx.compress(inputs[0]))
- ...
- ... # Subsequent frames use the previous fulltext as a prefix dictionary
- ... for i, raw in enumerate(inputs[1:]):
- ... dict_data = zstandard.ZstdCompressionDict(
- ... inputs[i], dict_type=zstandard.DICT_TYPE_RAWCONTENT)
- ... zctx = zstandard.ZstdCompressor(dict_data=dict_data)
- ... frames.append(zctx.compress(raw))
- ...
- ... return frames
- ``decompress_content_dict_chain()`` returns the uncompressed data of the last
- element in the input chain.
- .. note::
- It is possible to implement *prefix dictionary chain* decompression
- on top of other APIs. However, this function will likely be faster -
- especially for long input chains - as it avoids the overhead of
- instantiating and passing around intermediate objects between
- multiple functions.
- :param frames:
- List of ``bytes`` holding compressed zstd frames.
- :return:
- """
- if not isinstance(frames, list):
- raise TypeError("argument must be a list")
- if not frames:
- raise ValueError("empty input chain")
- # First chunk should not be using a dictionary. We handle it specially.
- chunk = frames[0]
- if not isinstance(chunk, bytes):
- raise ValueError("chunk 0 must be bytes")
- # All chunks should be zstd frames and should have content size set.
- chunk_buffer = ffi.from_buffer(chunk)
- params = ffi.new("ZSTD_FrameHeader *")
- zresult = lib.ZSTD_getFrameHeader(
- params, chunk_buffer, len(chunk_buffer)
- )
- if lib.ZSTD_isError(zresult):
- raise ValueError("chunk 0 is not a valid zstd frame")
- elif zresult:
- raise ValueError("chunk 0 is too small to contain a zstd frame")
- if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN:
- raise ValueError("chunk 0 missing content size in frame")
- self._ensure_dctx(load_dict=False)
- last_buffer = ffi.new("char[]", params.frameContentSize)
- out_buffer = ffi.new("ZSTD_outBuffer *")
- out_buffer.dst = last_buffer
- out_buffer.size = len(last_buffer)
- out_buffer.pos = 0
- in_buffer = ffi.new("ZSTD_inBuffer *")
- in_buffer.src = chunk_buffer
- in_buffer.size = len(chunk_buffer)
- in_buffer.pos = 0
- zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer)
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "could not decompress chunk 0: %s" % _zstd_error(zresult)
- )
- elif zresult:
- raise ZstdError("chunk 0 did not decompress full frame")
- # Special case of chain length of 1
- if len(frames) == 1:
- return ffi.buffer(last_buffer, len(last_buffer))[:]
- i = 1
- while i < len(frames):
- chunk = frames[i]
- if not isinstance(chunk, bytes):
- raise ValueError("chunk %d must be bytes" % i)
- chunk_buffer = ffi.from_buffer(chunk)
- zresult = lib.ZSTD_getFrameHeader(
- params, chunk_buffer, len(chunk_buffer)
- )
- if lib.ZSTD_isError(zresult):
- raise ValueError("chunk %d is not a valid zstd frame" % i)
- elif zresult:
- raise ValueError(
- "chunk %d is too small to contain a zstd frame" % i
- )
- if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN:
- raise ValueError("chunk %d missing content size in frame" % i)
- dest_buffer = ffi.new("char[]", params.frameContentSize)
- out_buffer.dst = dest_buffer
- out_buffer.size = len(dest_buffer)
- out_buffer.pos = 0
- in_buffer.src = chunk_buffer
- in_buffer.size = len(chunk_buffer)
- in_buffer.pos = 0
- zresult = lib.ZSTD_decompressStream(
- self._dctx, out_buffer, in_buffer
- )
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "could not decompress chunk %d: %s" % _zstd_error(zresult)
- )
- elif zresult:
- raise ZstdError("chunk %d did not decompress full frame" % i)
- last_buffer = dest_buffer
- i += 1
- return ffi.buffer(last_buffer, len(last_buffer))[:]
- def multi_decompress_to_buffer(
- self, frames, decompressed_sizes=None, threads=0
- ):
- """
- Decompress multiple zstd frames to output buffers as a single operation.
- (Experimental. Not available in CFFI backend.)
- Compressed frames can be passed to the function as a
- ``BufferWithSegments``, a ``BufferWithSegmentsCollection``, or as a
- list containing objects that conform to the buffer protocol. For best
- performance, pass a ``BufferWithSegmentsCollection`` or a
- ``BufferWithSegments``, as minimal input validation will be done for
- that type. If calling from Python (as opposed to C), constructing one
- of these instances may add overhead cancelling out the performance
- overhead of validation for list inputs.
- Returns a ``BufferWithSegmentsCollection`` containing the decompressed
- data. All decompressed data is allocated in a single memory buffer. The
- ``BufferWithSegments`` instance tracks which objects are at which offsets
- and their respective lengths.
- >>> dctx = zstandard.ZstdDecompressor()
- >>> results = dctx.multi_decompress_to_buffer([b'...', b'...'])
- The decompressed size of each frame MUST be discoverable. It can either be
- embedded within the zstd frame or passed in via the ``decompressed_sizes``
- argument.
- The ``decompressed_sizes`` argument is an object conforming to the buffer
- protocol which holds an array of 64-bit unsigned integers in the machine's
- native format defining the decompressed sizes of each frame. If this argument
- is passed, it avoids having to scan each frame for its decompressed size.
- This frame scanning can add noticeable overhead in some scenarios.
- >>> frames = [...]
- >>> sizes = struct.pack('=QQQQ', len0, len1, len2, len3)
- >>>
- >>> dctx = zstandard.ZstdDecompressor()
- >>> results = dctx.multi_decompress_to_buffer(frames, decompressed_sizes=sizes)
- .. note::
- It is possible to pass a ``mmap.mmap()`` instance into this function by
- wrapping it with a ``BufferWithSegments`` instance (which will define the
- offsets of frames within the memory mapped region).
- This function is logically equivalent to performing
- :py:meth:`ZstdCompressor.decompress` on each input frame and returning the
- result.
- This function exists to perform decompression on multiple frames as fast
- as possible by having as little overhead as possible. Since decompression is
- performed as a single operation and since the decompressed output is stored in
- a single buffer, extra memory allocations, Python objects, and Python function
- calls are avoided. This is ideal for scenarios where callers know up front that
- they need to access data for multiple frames, such as when *delta chains* are
- being used.
- Currently, the implementation always spawns multiple threads when requested,
- even if the amount of work to do is small. In the future, it will be smarter
- about avoiding threads and their associated overhead when the amount of
- work to do is small.
- :param frames:
- Source defining zstd frames to decompress.
- :param decompressed_sizes:
- Array of integers representing sizes of decompressed zstd frames.
- :param threads:
- How many threads to use for decompression operations.
- Negative values will use the same number of threads as logical CPUs
- on the machine. Values ``0`` or ``1`` use a single thread.
- :return:
- ``BufferWithSegmentsCollection``
- """
- raise NotImplementedError()
- def _ensure_dctx(self, load_dict=True):
- lib.ZSTD_DCtx_reset(self._dctx, lib.ZSTD_reset_session_only)
- if self._max_window_size:
- zresult = lib.ZSTD_DCtx_setMaxWindowSize(
- self._dctx, self._max_window_size
- )
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "unable to set max window size: %s" % _zstd_error(zresult)
- )
- zresult = lib.ZSTD_DCtx_setParameter(
- self._dctx, lib.ZSTD_d_format, self._format
- )
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "unable to set decoding format: %s" % _zstd_error(zresult)
- )
- if self._dict_data and load_dict:
- zresult = lib.ZSTD_DCtx_refDDict(self._dctx, self._dict_data._ddict)
- if lib.ZSTD_isError(zresult):
- raise ZstdError(
- "unable to reference prepared dictionary: %s"
- % _zstd_error(zresult)
- )
|