U Dx`-@sddlmZddlmZddlmZmZddlZddlm Z ddl Z ddl Z ddl Z ddlZddlZddlZddlmZddlmZddlmZmZmZmZmZmZmZddlmZm Z m!Z!m"Z"ddlm#Z$dd l%m&Z&m'Z'm(Z(d Z)d d Z*d dZ+ddZ,d]ddZ-dZ.ddZ/GdddZ0e 1dZ2ddZ3ddZ4ddZ5d Z6Gd!d"d"Z7d#d$Z8Gd%d&d&Z9Gd'd(d(Z:Gd)d*d*Z;Gd+d,d,Zd1d2Z?d3hZ@Gd4d5d5ZAd^d6d7ZBd8ZCGd9d:d:ZDd_d=d>ZEGd?d@d@ZFdAZGd`dDdEZHeGIdFdGJeCdHfdIe.eH_KdadJdKZLeGIdLeCdMe.eL_KdbdPdQZMdRIe6eM_KdSdTZNdcdUdVZOdddWdXZPdedYdZZQdfd[d\ZRdS)g) defaultdict)futures)partialreduceN) Collection) ParquetReader Statistics FileMetaDataRowGroupMetaDataColumnChunkMetaData ParquetSchema ColumnSchema)LocalFileSystem FileSystem_resolve_filesystem_and_path_ensure_filesystem) filesystem)guid _is_path_like_stringify_path)ZhdfscCs,t|}tj|}|jtkr$|jS|SdSN)rurllibparseurlparsescheme_URI_STRIP_SCHEMESpath)rZ parsed_urir6/tmp/pip-target-oguziej0/lib/python/pyarrow/parquet.py _parse_uri/s   rcCs2|dkrt||St|}t|}||fSdSr)legacyfsresolve_filesystem_and_pathrr)Zpassed_filesystemrZ parsed_pathrrr_get_filesystem_and_path:s   r"cCsRt|tr<|D]*}t|tr&td}nd}||krdSqnt|trNd|kSdS)NrTF) isinstancebyteschrstr)valbyteZ compare_torrr_check_contains_nullCs     r*TcCs|dk rt|dks&tdd|Dr.tdt|ddtrF|g}|r|D]@}|D]6\}}}t|tr|tdd|Dst|rVtdqVqN|S)z+ Check if filters are well-formed. Nrcss|]}t|dkVqdS)rN)len.0frrr Vsz!_check_filters..zMalformed filterscss|]}t|VqdSr)r*)r-vrrrr/bszBNull-terminated binary strings are not supported as filter values.) r+any ValueErrorr$r'listallr*NotImplementedError)filterscheck_null_strings conjunctioncolopr(rrr_check_filtersQs$r;azPredicates are expressed in disjunctive normal form (DNF), like ``[[('x', '=', 0), ...], ...]``. DNF allows arbitrary boolean logical combinations of single column predicates. The innermost tuples each describe a single column predicate. The list of inner predicates is interpreted as a conjunction (AND), forming a more selective and multiple column predicate. Finally, the most outer list combines these filters as a disjunction (OR). Predicates may also be passed as List[Tuple]. This form is interpreted as a single conjunction. To express OR in predicates, one must use the (preferred) List[List[Tuple]] notation. Each tuple has format: (``key``, ``op``, ``value``) and compares the ``key`` with the ``value``. The supported ``op`` are: ``=`` or ``==``, ``!=``, ``<``, ``>``, ``<=``, ``>=``, ``in`` and ``not in``. If the ``op`` is ``in`` or ``not in``, the ``value`` must be a collection such as a ``list``, a ``set`` or a ``tuple``. Examples: .. code-block:: python ('x', '=', 0) ('y', 'in', ['a', 'b', 'c']) ('z', 'not in', {'a','b'}) csrddlmt|jr|St|dd}fddg}|D](}fdd|D}|ttj|q<=>=innot inz,"{0}" is not a valid operator in predicates.)fieldisinr2format)r9r:r(rE)dsrrconvert_single_predicates,   z8_filters_to_expression..convert_single_predicatecsg|]\}}}|||qSrr)r-r9r:r()rIrr sz*_filters_to_expression..) pyarrow.datasetdatasetr$Z Expressionr;appendroperatorand_or_)r6Zdisjunction_membersr8Zconjunction_membersr)rIrHr_filters_to_expressions     rQc@seZdZdZdddZddZed d Zed d Zed dZ eddZ d ddZ d!ddZ d"ddZ d#ddZd$ddZd%ddZdS)& ParquetFilea Reader interface for a single Parquet file. Parameters ---------- source : str, pathlib.Path, pyarrow.NativeFile, or file-like object Readable source. For passing bytes or buffer-like file containing a Parquet file, use pyarrow.BufferReader. metadata : FileMetaData, default None Use existing metadata object, rather than reading from file. common_metadata : FileMetaData, default None Will be used in reads for pandas schema metadata if not found in the main file's metadata, no other uses at the moment. memory_map : bool, default False If the source is a file path, use a memory map to read file, which can improve performance in some environments. buffer_size : int, default 0 If positive, perform read buffering when deserializing individual column chunks. Otherwise IO calls are unbuffered. NFrcCs2t|_|jj|||||d||_||_dS)N)Zuse_memory_map buffer_sizeread_dictionarymetadata)rreaderopencommon_metadata_build_nested_paths_nested_paths_by_prefix)selfsourcerUrXrT memory_maprSrrr__init__s zParquetFile.__init__cCsn|jj}tt}t|D]P\}}|d}|dd}||||sHqd||df}|dd}q4q|S)Nr.)rVZ column_pathsrr3 enumeraterMjoin)r[pathsresultirkeyrestrrrrYs zParquetFile._build_nested_pathscCs|jjSr)rVrUr[rrrrUszParquetFile.metadatacCs|jjS)zG Return the Parquet schema, unconverted to Arrow types )rUschemarhrrrriszParquetFile.schemacCs|jjS)zj Return the inferred Arrow schema, converted from the whole Parquet file's schema )rV schema_arrowrhrrrrjszParquetFile.schema_arrowcCs|jjSr)rVnum_row_groupsrhrrrrkszParquetFile.num_row_groupsTcCs |j||d}|jj|||dS)a Read a single row group from a Parquet file. Parameters ---------- columns: list If not None, only these columns will be read from the row group. A column name may be a prefix of a nested field, e.g. 'a' will select 'a.b', 'a.c', and 'a.d.e'. use_threads : bool, default True Perform multi-threaded column reads. use_pandas_metadata : bool, default False If True and file has custom pandas schema metadata, ensure that index columns are also loaded. Returns ------- pyarrow.table.Table Content of the row group as a table (of columns) use_pandas_metadatacolumn_indices use_threads)_get_column_indicesrVread_row_group)r[recolumnsrprmrorrrrr s zParquetFile.read_row_groupcCs |j||d}|jj|||dS)a Read a multiple row groups from a Parquet file. Parameters ---------- row_groups: list Only these row groups will be read from the file. columns: list If not None, only these columns will be read from the row group. A column name may be a prefix of a nested field, e.g. 'a' will select 'a.b', 'a.c', and 'a.d.e'. use_threads : bool, default True Perform multi-threaded column reads. use_pandas_metadata : bool, default False If True and file has custom pandas schema metadata, ensure that index columns are also loaded. Returns ------- pyarrow.table.Table Content of the row groups as a table (of columns). rlrn)rqrVread_row_groups)r[ row_groupsrsrprmrorrrrt$szParquetFile.read_row_groupscCs<|dkrtd|jj}|j||d}|jj||||d}|S)a Read streaming batches from a Parquet file Parameters ---------- batch_size: int, default 64K Maximum number of records to yield per batch. Batches may be smaller if there aren't enough rows in the file. row_groups: list Only these row groups will be read from the file. columns: list If not None, only these columns will be read from the file. A column name may be a prefix of a nested field, e.g. 'a' will select 'a.b', 'a.c', and 'a.d.e'. use_threads : boolean, default True Perform multi-threaded column reads. use_pandas_metadata : boolean, default False If True and file has custom pandas schema metadata, ensure that index columns are also loaded. Returns ------- iterator of pyarrow.RecordBatch Contents of each batch as a record batch Nrrl)rurorp)rangerUrkrqrV iter_batches)r[ batch_sizerursrprmroZbatchesrrrrxBszParquetFile.iter_batchescCs|j||d}|jj||dS)a Read a Table from Parquet format, Parameters ---------- columns: list If not None, only these columns will be read from the file. A column name may be a prefix of a nested field, e.g. 'a' will select 'a.b', 'a.c', and 'a.d.e'. use_threads : bool, default True Perform multi-threaded column reads. use_pandas_metadata : bool, default False If True and file has custom pandas schema metadata, ensure that index columns are also loaded. Returns ------- pyarrow.table.Table Content of the file as a table (of columns). rlrn)rqrVZread_all)r[rsrprmrorrrreadhszParquetFile.readcCs||}|jj||dS)a Read contents of file for the given columns and batch size. Notes ----- This function's primary purpose is benchmarking. The scan is executed on a single thread. Parameters ---------- columns : list of integers, default None Select columns to read, if None scan all columns. batch_size : int, default 64K Number of rows to read at a time internally. Returns ------- num_rows : number of rows in file )ry)rqrV scan_contents)r[rsryrorrrr{s zParquetFile.scan_contentscs|dkr dSg}|D]}|jkr|j|q|rjj}jdk rRjjnd}|rld|krlt|}n|rd|krt|}ng}|dk r|r|fdd|D7}|S)Npandascs"g|]}t|tsj|qSr)r$dictrVZcolumn_name_idx)r-descrrhrrrJs z3ParquetFile._get_column_indices..)rZextendrUrX_get_pandas_index_columns)r[Z column_namesrmindicesnameZfile_keyvaluesZcommon_keyvalues index_columnsrrhrrqs,        zParquetFile._get_column_indices)NNNFr)NTF)NTF)rvNNTF)NTF)Nrv)F)__name__ __module__ __qualname____doc__r^rYpropertyrUrirjrkrrrtrxrzr{rqrrrrrRs8        &  rRz [ ,;{}() =]cCs td|S)N_)_SPARK_DISALLOWED_CHARSsub)rrrr_sanitized_spark_field_namesrc Csd|krxg}d}|D]J}|j}t|}||krTd}t||j|j|j}||q||qtj||jd}||fS|dfSdS)NsparkFT)rU) rrparEtypeZnullablerUrMri) riflavorZsanitized_fieldsschema_changedrErZsanitized_nameZsanitized_field new_schemarrr_sanitize_schemas"   rcs8d|kr0fddtjD}tjj||dSSdS)Nrcsg|] }|qSrr)r-retablerrrJsz#_sanitize_table..)ri)rwZ num_columnsrTable from_arrays)rrrZ column_datarrr_sanitize_tablesraversion : {"1.0", "2.0"}, default "1.0" Determine which Parquet logical types are available for use, whether the reduced set from the Parquet 1.x.x format or the expanded logical types added in format version 2.0.0 and after. Note that files written with version='2.0' may not be readable in all Parquet implementations, so version='1.0' is likely the choice that maximizes file compatibility. Some features, such as lossless storage of nanosecond timestamps as INT64 physical storage, are only available with version='2.0'. The Parquet 2.0.0 format version also introduced a new serialized data page format; this can be enabled separately using the data_page_version option. use_dictionary : bool or list Specify if we should use dictionary encoding in general or only for some columns. use_deprecated_int96_timestamps : bool, default None Write timestamps to INT96 Parquet format. Defaults to False unless enabled by flavor argument. This take priority over the coerce_timestamps option. coerce_timestamps : str, default None Cast timestamps a particular resolution. The defaults depends on `version`. For ``version='1.0'`` (the default), nanoseconds will be cast to microseconds ('us'), and seconds to milliseconds ('ms') by default. For ``version='2.0'``, the original resolution is preserved and no casting is done by default. The casting might result in loss of data, in which case ``allow_truncated_timestamps=True`` can be used to suppress the raised exception. Valid values: {None, 'ms', 'us'} data_page_size : int, default None Set a target threshold for the approximate encoded size of data pages within a column chunk (in bytes). If None, use the default data page size of 1MByte. allow_truncated_timestamps : bool, default False Allow loss of data when coercing timestamps to a particular resolution. E.g. if microsecond or nanosecond data is lost when coercing to 'ms', do not raise an exception. compression : str or dict Specify the compression codec, either on a general basis or per-column. Valid values: {'NONE', 'SNAPPY', 'GZIP', 'BROTLI', 'LZ4', 'ZSTD'}. write_statistics : bool or list Specify if we should write statistics in general (default is True) or only for some columns. flavor : {'spark'}, default None Sanitize schema or set other compatibility options to work with various target systems. filesystem : FileSystem, default None If nothing passed, will be inferred from `where` if path-like, else `where` is already a file-like object so no filesystem is needed. compression_level: int or dict, default None Specify the compression level for a codec, either on a general basis or per-column. If None is passed, arrow selects the compression level for the compression codec in use. The compression level has a different meaning for each codec, so you have to read the documentation of the codec you are using. An exception is thrown if the compression codec does not allow specifying a compression level. use_byte_stream_split: bool or list, default False Specify if the byte_stream_split encoding should be used in general or only for some columns. If both dictionary and byte_stream_stream are enabled, then dictionary is preferred. The byte_stream_split encoding is valid only for floating-point data types and should be combined with a compression codec. data_page_version : {"1.0", "2.0"}, default "1.0" The serialized Parquet data page format version to write, defaults to 1.0. This does not impact the file schema logical types and Arrow to Parquet type casting behavior; for that use the "version" option. c @sJeZdZdeZdddZd d Zd d Zd dZ dddZ ddZ dS) ParquetWritera Class for incrementally building a Parquet file for Arrow tables. Parameters ---------- where : path or file-like object schema : arrow Schema {} **options : dict If options contains a key `metadata_collector` then the corresponding value is assumed to be a list (or any object with `.append` method) that will be filled with the file metadata instance of the written file. N1.0TsnappyFc Ks| dkr"|dk rd|krd} nd} ||_|dk rBt||\}|_nd|_||_||_d|_t||dd\}}|dk rt|tj r| |d}|_q|j |dd}|_n|}| dd|_ d}tj||f||||| | | || d ||_d|_dS) NrTF)Zallow_legacy_filesystemwb) compressionmetadata_collectorZV2) versionruse_dictionarywrite_statisticsuse_deprecated_int96_timestampscompression_leveluse_byte_stream_splitwriter_engine_versiondata_page_version)rrrriwhere file_handlerr$r rrWZopen_output_streampop_metadata_collector_parquetrwriteris_open)r[rrirrrrrrrrrrroptionsrZsinkZengine_versionrrrr^4sV     zParquetWriter.__init__cCst|ddr|dS)NrF)getattrcloserhrrr__del__ts zParquetWriter.__del__cCs|Srrrhrrr __enter__xszParquetWriter.__enter__cOs |dSNF)r)r[argskwargsrrr__exit__{szParquetWriter.__exit__cCs^|jrt||j|j}|js t|jj|jddsJd|j|j}t||j j ||ddS)NFZcheck_metadatazTTable schema does not match schema used to create file: table: {!s} vs. file: {!s}row_group_size) rrrirrAssertionErrorequalsrGr2r write_table)r[rrmsgrrrrs zParquetWriter.write_tablecCsH|jr0|jd|_|jdk r0|j|jj|jdk rD|jdSr)rrrrrMrUrrhrrrrs   zParquetWriter.close) NNrTrTNNFNr)N) rrrrG_parquet_writer_arg_docsrr^rrrrrrrrrr#s(  @ rcCst|dddS)Nr|utf8r)jsonloadsdecode) keyvaluesrrrrsrc@s\eZdZdZeedddddfddZddZd d Zd d Z d dZ ddZdddZ dS)ParquetDatasetPiecea A single chunk of a potentially larger Parquet dataset to read. The arguments will indicate to read either a single row group or all row groups, and whether to add partition keys to the resulting pyarrow.Table. Parameters ---------- path : str or pathlib.Path Path to file in the file system where this piece is located. open_file_func : callable Function to use for obtaining file handle to dataset piece. partition_keys : list of tuples Two-element tuples of ``(column name, ordinal index)``. row_group : int, default None Row group to load. By default, reads all row groups. rbmodeNcCs.t||_||_||_|pg|_|p&i|_dSr)rropen_file_func row_grouppartition_keys file_options)r[rrrrrrrrr^s   zParquetDatasetPiece.__init__cCs2t|tsdS|j|jko0|j|jko0|j|jkSr)r$rrrrr[otherrrr__eq__s    zParquetDatasetPiece.__eq__cCsdt|j|j|j|jS)Nz-{}({!r}, row_group={!r}, partition_keys={!r}))rGrrrrrrhrrr__repr__s zParquetDatasetPiece.__repr__cCs^d}t|jdkr6ddd|jD}|d|7}||j7}|jdk rZ|d|j7}|S)Nrz, css|]\}}d||VqdS)z{}={}NrG)r-rindexrrrr/sz.ParquetDatasetPiece.__str__..zpartition[{}] z | row_group={})r+rrbrGrr)r[rdZ partition_strrrr__str__s   zParquetDatasetPiece.__str__cCs|}|jS)zn Return the file's metadata. Returns ------- metadata : FileMetaData )rWrU)r[r.rrr get_metadatasz ParquetDatasetPiece.get_metadatacCs(||j}t|ts$t|f|j}|S)z1 Return instance of ParquetFile. )rrr$rRr)r[rVrrrrWs  zParquetDatasetPiece.openTFcCs|jdk r|}n(|dk r,t|f|j}nt|jf|j}t|||d}|jdk rf|j|jf|}n |jf|}t |j dkr|dkrt dt |j D]F\} \} } t jt || dd} |j| j} tj| | }|| |}q|S)a Read this piece as a pyarrow.Table. Parameters ---------- columns : list of column names, default None use_threads : bool, default True Perform multi-threaded column reads. partitions : ParquetPartitions, default None file : file-like object Passed to ParquetFile. Returns ------- table : pyarrow.Table NrsrprmrzMust pass partition setsi4)Zdtype)rrWrRrrr}rrrrzr+rr2ranpfulllevels dictionaryrZDictionaryArrayrZ append_column)r[rsrp partitionsfilermrVrrrerrrrZarrrrrrzs*      zParquetDatasetPiece.read)NTNNF) rrrrrrWr^rrrrrzrrrrrs    rc@s:eZdZdZd ddZddZeddZed d ZdS) PartitionSeta A data structure for cataloguing the observed Parquet partitions at a particular level. So if we have /foo=a/bar=0 /foo=a/bar=1 /foo=a/bar=2 /foo=b/bar=0 /foo=b/bar=1 /foo=b/bar=2 Then we have two partition sets, one for foo, another for bar. As we visit levels of the partition hierarchy, a PartitionSet tracks the distinct values and assigns categorical codes to use when reading the pieces NcCs0||_|p g|_ddt|jD|_d|_dS)NcSsi|]\}}||qSrr)r-rekrrr @sz)PartitionSet.__init__..)rkeysra key_indices _dictionary)r[rrrrrr^=s zPartitionSet.__init__cCs<||jkr|j|St|j}|j|||j|<|SdS)zc Get the index of the partition value if it is known, otherwise assign one N)rr+rrM)r[rfrrrr get_indexCs      zPartitionSet.get_indexcCsp|jdk r|jSt|jdkr&tdzdd|jD}t|}Wn tk rdt|j}YnX||_|S)NrzNo known partition keyscSsg|] }t|qSr)intr-xrrrrJZsz+PartitionSet.dictionary..)rr+rr2libarray)r[Z integer_keysrrrrrPs zPartitionSet.dictionarycCst|jt|jkSr)r3rsortedrhrrr is_sortedbszPartitionSet.is_sorted)N) rrrrr^rrrrrrrrr,s   rc@sDeZdZddZddZddZddZd d Zd d Zd dZ dS)ParquetPartitionscCsg|_t|_dSr)rsetpartition_namesrhrrrr^iszParquetPartitions.__init__cCs t|jSr)r+rrhrrr__len__mszParquetPartitions.__len__cCs |j|Sr)r)r[rerrr __getitem__pszParquetPartitions.__getitem__cCs*t|tstd|j|jko(|j|jkS)Nz0`other` must be an instance of ParquetPartitions)r$r TypeErrorrrrrrrrss    zParquetPartitions.equalscCs*z ||WStk r$tYSXdSrrrNotImplementedrrrrrzs zParquetPartitions.__eq__cCsV|t|jkrF||jkr&td|t|}|j||j||j||S)aT Record a partition value at a particular level, returning the distinct code for that value at that level. Example: partitions.get_index(1, 'foo', 'a') returns 0 partitions.get_index(1, 'foo', 'b') returns 1 partitions.get_index(1, 'foo', 'c') returns 2 partitions.get_index(1, 'foo', 'a') returns 0 Parameters ---------- level : int The nesting level of the partition we are observing name : str The partition name key : str or int The partition value z1{} was the name of the partition in another level) r+rrr2rGrrMaddr)r[levelrrfZpart_setrrrrs   zParquetPartitions.get_indexc Cs\|\}}|\}}}||krdSt|} |dkrt|tsDtd| j|sPtdtdd|Ddkrptd|ttt|} nt|t st|trtd || |j |j | } |d ks|d kr| |kS|d kr| |kS|d kr| |kS|dkr| |kS|dkr| |kS|dkr&| |kS|dkr8| |kS|dkrJ| |kStd|ddS)NT>rDrCz'%s' object is not a collectionz+Cannot use empty collection as filter valuecSsh|] }t|qSr)r)r-itemrrr sz=ParquetPartitions.filter_accepts_partition..r_z8All elements of the collection '%s' must be of same typez-Op '%s' not supported with a collection valuer<r=r>r?r@rArBrCrDz+'%s' is not a valid operator in predicates.) rr$rrrr2r+nextiterr'rrZas_py) r[part_keyfilterrZp_columnZ p_value_indexZf_columnr:Zf_valueZf_typeZp_valuerrrfilter_accepts_partitionsZ         z*ParquetPartitions.filter_accepts_partitionN) rrrr^rrrrrrrrrrrgs rc@s>eZdZdddZddZd d Zd d Zd dZddZdS)ParquetManifestN/hiver_cCst||\}}||_||_||_t||_||_t|_g|_ ||_ t j |d|_ d|_d|_|d|jg|j jddd|jdkr|j|_|j dS)N) max_workersrcSs|jSr)rpiecerrrz*ParquetManifest.__init__..)rf)r"rrpathseprdirpathpartition_schemerrpieces_metadata_nthreadsrZThreadPoolExecutor _thread_poolcommon_metadata_path metadata_path _visit_levelsortshutdown)r[rrrrrmetadata_nthreadsrrrr^s&  zParquetManifest.__init__c sj}t|\}}}g}|D]P} j| f} | drH| _q"| drZ| _q"| rhq"q"| | q"fdd|D} | | t |dkrt | dkrt d n(t | dkr܈|| |n ||dS)NZ_common_metadata _metadatacs$g|]}t|sj|fqSr)_is_private_directoryrrbr base_pathr[rrrJsz0ParquetManifest._visit_level..rz,Found files in an intermediate directory: {})rrwalkrrbendswithr r _should_silently_excluderMr r+r2rG_visit_directories _push_pieces) r[rr part_keysfsr directoriesfilesZfiltered_filesr full_pathZfiltered_directoriesrrrr s0      zParquetManifest._visit_levelcCs0|dp.|dp.|dp.|dp.|tkS)Nz.crcz _$folder$r`r)r startswithEXCLUDED_PARQUET_PATHS)r[ file_namerrrrs z(ParquetManifest._should_silently_excludec Csg}|D]~}t||j\}}t|\}} |j||| } ||| fg} ||jkrt|j|j|d|| } | | q||d|| q|rt |dSNr_) _path_splitr_parse_hive_partitionrrrr Zsubmitr rMrwait) r[rrrZ futures_listrheadtailrrfrZ dir_part_keysfuturerrrrs     z"ParquetManifest._visit_directoriescCs&|jdkrt|Std|jdS)Nrzpartition schema: {})rr#r5rG)r[dirnamerrr_parse_partition+s  z ParquetManifest._parse_partitioncs jfdd|DdS)Ncsg|]}t|jdqS))rr)rrr-rrr[rrrJ3sz0ParquetManifest._push_pieces..)rr)r[rrrr+rr2szParquetManifest._push_pieces)NNrrr_) rrrr^r rrr)rrrrrrs !rcCs"d|krtd||ddS)Nr<z3Directory name did not appear to be a partition: {}r_)r2rGsplit)valuerrrr#:s r#cCs,tj|\}}|ds$|do*d|kS)Nrr`r<)osrr,r)rrr&rrrrAsrcCs:||d}|d|||d}}||}||fSr!)rfindrstrip)rseprer%r&rrrr"Fs r"Z_SUCCESSc@seZdZdZdS)_ParquetDatasetMetadata)rr]rTrXrSN)rrr __slots__rrrrr2Psr2cCsD|jdk r(t|jtjs(|jj|dd}t|||j|j|j|j dS)Nrr)rUr]rTrXrS) rr$r rrWrRr]rTrXrS)rLrmetarrr_open_dataset_fileUs  r5aread_dictionary : list, default None List of names or column paths (for nested types) to read directly as DictionaryArray. Only supported for BYTE_ARRAY storage. To read a flat column as dictionary-encoded pass the column name. For nested types, you must pass the full column "path", which could be something like level1.level2.list.item. Refer to the Parquet file's schema to obtain the paths. memory_map : bool, default False If the source is a file path, use a memory map to read file, which can improve performance in some environments. buffer_size : int, default 0 If positive, perform read buffering when deserializing individual column chunks. Otherwise IO calls are unbuffered. partitioning : Partitioning or str or list of str, default "hive" The partitioning scheme for a partitioned dataset. The default of "hive" assumes directory names with key=value pairs like "/year=2009/month=11". In addition, a scheme like "/2009/11" is also supported, in which case you need to specify the field names or a full schema. See the ``pyarrow.dataset.partitioning()`` function for more details.c @seZdZdeeZddd Zd d d Zd d Z ddZ ddZ d!ddZ ddZ ddZddZeedZeedZeedZeedZeedZdS)"ParquetDatasetaP Encapsulates details of reading a complete Parquet dataset possibly consisting of multiple files and partitions in subdirectories. Parameters ---------- path_or_paths : str or List[str] A directory name, single file name, or list of file names. filesystem : FileSystem, default None If nothing passed, paths assumed to be found in the local on-disk filesystem. metadata : pyarrow.parquet.FileMetaData Use metadata obtained elsewhere to validate file schemas. schema : pyarrow.parquet.Schema Use schema obtained elsewhere to validate file schemas. Alternative to metadata parameter. split_row_groups : bool, default False Divide files into pieces for each row group in the file. validate_schema : bool, default True Check that individual file schemas are all the same / compatible. filters : List[Tuple] or List[List[Tuple]] or None (default) Rows which do not match the filter predicate will be removed from scanned data. Partition keys embedded in a nested directory structure will be exploited to avoid loading files at all if they contain no matching rows. If `use_legacy_dataset` is True, filters can only reference partition keys and only a hive-style directory structure is supported. When setting `use_legacy_dataset` to False, also within-file level filtering and different partitioning schemes are supported. {1} metadata_nthreads: int, default 1 How many threads to allow the thread pool which is used to read the dataset metadata. Increasing this is helpful to read partitioned datasets. {0} use_legacy_dataset : bool, default True Set to False to enable the new code path (experimental, using the new Arrow Dataset API). Among other things, this allows to pass `filters` for all columns and not only the partition keys, enables different partitioning schemes, etc. NFTr_rrcCsN| dkrt|trd} nd} | s@t|||| | | | |||||d St|}|S)NFT) rr6 partitioningrTr]rSrirUsplit_row_groupsvalidate_schemar)r$r_ParquetDatasetV2object__new__)cls path_or_pathsrrirUr8r9r6rrTr]rSr7use_legacy_datasetr[rrrr<s&  zParquetDataset.__new__c Cst| dkrtdt|_|}t|tr.|d}t||\|j_}t|tr\dd|D|_n t||_| |j_ | |j_ | |j_ t ||j|t t|jd\|_|_|_|_|jdk r|j|j}t|| d|j_W5QRXnd|j_|dkr&|jdk r&|j|j}t|| d|_W5QRXn||_||_||_|rFtd|dk rbt|}|||rp|dS) NrzVOnly "hive" for hive-like partitioning is supported when using use_legacy_dataset=TruercSsg|] }t|qSr)rr*rrrrJsz+ParquetDataset.__init__..)rrr]z$split_row_groups not yet implemented)r2r2rr$r3r"rrcrrTr]rS_make_manifestrr5rrr r rW read_metadatarXrUrir8r5r;_filtervalidate_schemas)r[r>rrirUr8r9r6rrTr]rSr7r?Za_pathrr.rrrr^sZ       zParquetDataset.__init__cCsNt|tstd|jj|jjkr&dSdD]}t||t||kr*dSq*dS)Nz-`other` must be an instance of ParquetDatasetF) rcr]rrr r rXrUrirSr8T)r$r6rr __class__r)r[rproprrrrs zParquetDataset.equalscCs*z ||WStk r$tYSXdSrrrrrrr s zParquetDataset.__eq__cCs|jdkr>|jdkr>|jdk r*|jj|_qR|jdj|_n|jdkrR|jj|_|j}|jdk r|jjD]&}||dkrn||}| |}qn|jD]6}|}|j}|j |ddst d |||qdS)NrFrz-Schema in {!s} was different. {!s} vs {!s}) rUrirXrrto_arrow_schemarrget_field_indexremoverr2rG)r[Zdataset_schemaZpartition_nameZ field_idxrZ file_metadataZ file_schemarrrrDs*           zParquetDataset.validate_schemasc Csng}|jD]"}|j|||j|d}||q t|}|rj|}|jjpNi} |rjd| krj| d|i}|S)a Read multiple Parquet files as a single pyarrow.Table. Parameters ---------- columns : List[str] Names of columns to read from the file. use_threads : bool, default True Perform multi-threaded column reads use_pandas_metadata : bool, default False Passed through to each dataset piece. Returns ------- pyarrow.Table Content of the file as a table (of columns). )rsrprrmr|) rrzrrMrZ concat_tables_get_common_pandas_metadatarirUreplace_schema_metadata) r[rsrprmZtablesrrZall_datarXZcurrent_metadatarrrrz/s"     zParquetDataset.readcKs|jfddi|S)a Read dataset including pandas metadata, if any. Other arguments passed through to ParquetDataset.read, see docstring for further details. Returns ------- pyarrow.Table Content of the file as a table (of columns). rmTrzr[rrrr read_pandasWs zParquetDataset.read_pandascCs"|jdkrdS|jj}|ddS)Nr|)rXrUget)r[rrrrrKcs z*ParquetDataset._get_common_pandas_metadatacs<|jjfddfddfdd|jD|_dS)Ncstfddt|jDS)Nc3s|]\}}||VqdSrr)r-rr)accepts_filterrrrr/nszEParquetDataset._filter..one_filter_accepts..)r4rar)rr)rQ)rrone_filter_acceptsmsz2ParquetDataset._filter..one_filter_acceptscstfddDS)Nc3s&|]}tfdd|DVqdS)c3s|]}|VqdSrrr,rRrrrr/rszOParquetDataset._filter..all_filters_accept...N)r4)r-r8rSrrr/rszEParquetDataset._filter..all_filters_accept..)r1r)r6rRrrall_filters_acceptqsz2ParquetDataset._filter..all_filters_acceptcsg|]}|r|qSrr)r-p)rTrrrJusz*ParquetDataset._filter..)rrr)r[r6r)rQrTr6rRrrCjs zParquetDataset._filterz _metadata.fsz_metadata.memory_mapz_metadata.read_dictionaryz_metadata.common_metadataz_metadata.buffer_size) NNNNFTNr_NFrrN) NNNFTNr_NFrrT)NTF)rrrrG_read_docstring_common_DNF_filter_docrr<r^rrrDrzrOrKrCrrN attrgetterrr]rTrXrSrrrrr6ysX(*  = (  r6rr_c Csd}d}d}t|tr*t|dkr*|d}t|rp||rpt|||t|dd|d}|j}|j}|j } |j }n`t|ts|g}t|dkrt dg} |D]2} | | st d| t| |d} | | q| |||fS) Nr_rrr)rrrrz Must pass at least one file pathzPassed non-file path: {})r)r$r3r+risdirrrr r rrr2isfileOSErrorrGrrM) r>rrrrrr r manifestrrrrrrrAs8      rAc@sDeZdZdZdddZeddZdd d Zd d ZeddZ dS)r:zC ParquetDataset shim using the Dataset API under the hood. NrFc Ksddlm} dD]*\} } | | kr| | | k rtd| qi} |rR| jd|d|dk rf| j|d||_|ovt||_|dk rt||d}n|dkr|rt |d}d}t |t rt |dkr|d}nht |r.t|}|dkrzt|\}}Wn tk rt |d}YnX||jr2|}n|}|dk rd|_| jdd | j| d }|||}| j|g|j||jd |_dSd |_| j| d }|d kr| jjdd}| j|||||d|_dS)Nr))riN)rUN)r8F)r9T)rr_z;Keyword '{0}' is not yet supported with the new Dataset APIT)Zuse_buffered_streamrS)Zdictionary_columns)Zuse_mmapr_)Z!enable_parallel_column_conversion) read_options)rirGrFr)Zinfer_dictionary)rrGr7ignore_prefixes)rKrLr2rGupdate_filtersrQ_filter_expressionrrr$r3r+rr'rZfrom_uriZ get_file_infois_file"_enable_parallel_column_conversionParquetFileFormatZ make_fragmentZFileSystemDatasetZphysical_schemar_datasetZHivePartitioningdiscover)r[r>rr6r7rTrSr]r^rrHkeyworddefaultr]Z single_fileparquet_formatfragmentrrrr^s~                 z_ParquetDatasetV2.__init__cCs|jjSr)rerirhrrrrisz_ParquetDatasetV2.schemaTcCs|jj}|dk rJ|rJ|rJd|krJddt|D}|tt|t|}|jrX|rXd}|jj||j|d}|r|rd|kr|jjpi}| d|di| |}|S)a Read (multiple) Parquet files as a single pyarrow.Table. Parameters ---------- columns : List[str] Names of columns to read from the dataset. The partition fields are not automatically included (in contrast to when setting ``use_legacy_dataset=True``). use_threads : bool, default True Perform multi-threaded column reads. use_pandas_metadata : bool, default False If True and file has custom pandas schema metadata, ensure that index columns are also loaded. Returns ------- pyarrow.Table Content of the file as a table (of columns). Nr|cSsg|]}t|ts|qSr)r$r}r-r9rrrrJ s z*_ParquetDatasetV2.read..F)rsrrp) rirUrr3rrcreZto_tablerar_rL)r[rsrprmrUrrZ new_metadatarrrrzs*     z_ParquetDatasetV2.readcKs|jfddi|S)z Read dataset including pandas metadata, if any. Other arguments passed through to ParquetDataset.read, see docstring for further details. rmTrMrNrrrrO;sz_ParquetDatasetV2.read_pandascCst|jSr)r3reZ get_fragmentsrhrrrrBsz_ParquetDatasetV2.pieces)NNrNNFN)NTF) rrrrr^rrirzrOrrrrrr:s T  6r:a {0} Parameters ---------- source: str, pyarrow.NativeFile, or file-like object If a string passed, can be a single file name or directory name. For file-like objects, only read a single file. Use pyarrow.BufferReader to read a file contained in a bytes or buffer-like object. columns: list If not None, only these columns will be read from the file. A column name may be a prefix of a nested field, e.g. 'a' will select 'a.b', 'a.c', and 'a.d.e'. use_threads : bool, default True Perform multi-threaded column reads. metadata : FileMetaData If separately computed {1} use_legacy_dataset : bool, default False By default, `read_table` uses the new Arrow Datasets API since pyarrow 1.0.0. Among other things, this allows to pass `filters` for all columns and not only the partition keys, enables different partitioning schemes, etc. Set to True to use the legacy behaviour. ignore_prefixes : list, optional Files matching any of these prefixes will be ignored by the discovery process if use_legacy_dataset=False. This is matched to the basename of a path. By default this is ['.', '_']. Note that discovery happens only if a directory is passed as source. filesystem : FileSystem, default None If nothing passed, paths assumed to be found in the local on-disk filesystem. filters : List[Tuple] or List[List[Tuple]] or None (default) Rows which do not match the filter predicate will be removed from scanned data. Partition keys embedded in a nested directory structure will be exploited to avoid loading files at all if they contain no matching rows. If `use_legacy_dataset` is True, filters can only reference partition keys and only a hive-style directory structure is supported. When setting `use_legacy_dataset` to False, also within-file level filtering and different partitioning schemes are supported. {3} Returns ------- {2} Frc  Cs| s|dk rtdzt||| ||| || d} Wnftk r|dk rPtd| dkr`tdt||\}}|dk r||}t||||| d} YnX| j|||dS| dk rtdt|rt||||| ||| d }nt||||| d}|j|||dS) NzThe 'metadata' keyword is no longer supported with the new datasets-based implementation. Specify 'use_legacy_dataset=True' to temporarily recover the old behaviour.)rr7r]rTrSr6r^zWthe 'filters' keyword is not supported when the pyarrow.dataset module is not availablerz\the 'partitioning' keyword is not supported when the pyarrow.dataset module is not available)rUrTr]rSrzMThe 'ignore_prefixes' keyword is only supported when use_legacy_dataset=False)rUr]rTrSrr6r7) r2r: ImportErrorrZopen_input_filerRrzrr6)r\rsrprUrmr]rTrr6rSr7r?r^rLrpfrrr read_tablezst   rnz}Read a Table from Parquet format Note: starting with pyarrow 1.0, the default for `use_legacy_dataset` is switched to False. zuse_pandas_metadata : bool, default False If True and file has custom pandas schema metadata, ensure that index columns are also loadedz=pyarrow.Table Content of the file as a table (of columns)c Cst|||||||d||d S)NT) rsrprUr6r]rSrmr?r^)rn) r\rsrpr]rUr6rSr?r^rrrrOsrOzcRead a Table from Parquet format, also reading DataFrame index values if known in the file metadatazgpyarrow.Table Content of the file as a Table of Columns, including DataFrame indexes as columnsrrcKs|d|}|}zNt||jf| || |||| | ||| ||d |}|j||dW5QRXWnHtk rt|rztt|Wntj k rYnXYnXdS)N chunk_size) rrrrrcoerce_timestampsdata_page_sizeallow_truncated_timestampsrrrrrr) rrrir Exceptionrr.rJrerror)rrrrrrrrrqrsrrrrrrrrZ use_int96rrrrrs@ rz Write a Table to Parquet format. Parameters ---------- table : pyarrow.Table where: string or pyarrow.NativeFile row_group_size: int The number of rows per rowgroup {} cCsH|rD||sDz||Wn"tk rB||s>tYnXdSr)Z _isfilestoreexistsmkdirr[r)rrrrr_mkdir_if_not_existss rxc s|sddlm}|dd}|dd} d} |dd} | dk rNt| d|dk rdt| d|} | jf|} |dk rt|}d}|r||j }|j |d d }|j |||| | ||| d dSt ||\}}t|||dd} |dk rft|dkrf|fd d |D}j|dd}j|}t|dkrPtd|j }|j jD] }||kr^|||}q^||D]\}}t|ts|f}ddd t||D}tjj||dd}t|d||g|r||}n td}d||g}d||g}||d}t ||fd| i|W5QRX| dk r| d!|qnn|rv|d}n td}d||g}||d}t ||fd| i|W5QRX| dk r| d!|dS)aWrapper around parquet.write_table for writing a Table to Parquet format by partitions. For each combination of partition columns and values, a subdirectories are created in the following manner: root_dir/ group1=value1 group2=value1 .parquet group2=value2 .parquet group1=valueN group2=value1 .parquet group2=valueN .parquet Parameters ---------- table : pyarrow.Table root_path : str, pathlib.Path The root directory of the dataset filesystem : FileSystem, default None If nothing passed, paths assumed to be found in the local on-disk filesystem partition_cols : list, Column names by which to partition the dataset Columns are partitioned in the order they are given partition_filename_cb : callable, A callback function that takes the partition key(s) as an argument and allow you to override the partition filename. If nothing is passed, the filename will consist of a uuid. use_legacy_dataset : bool, default True Set to False to enable the new code path (experimental, using the new Arrow Dataset API). This is more efficient when using partition columns, but does not (yet) support `partition_filename_cb` and `metadata_collector` keywords. **kwargs : dict, Additional kwargs for write_table function. See docstring for `write_table` or `ParquetWriter` for more information. Using `metadata_collector` in kwargs allows one to collect the file metadata instances of dataset pieces. The file paths in the ColumnChunkMetaData will be set relative to `root_path`. rNrirpTzGThe '{}' argument is not supported with the new dataset implementation.rpartition_filename_cbr)r)rrGrrir7rpcsg|] }|qSrrrkZdfrrrJsz$write_to_dataset..rs)Zaxisz.No data left to save outside partition columnsrcSsg|]\}}dj||dqS)z{colname}={value})Zcolnamer-r)r-rr(rrrrJsF)risafez.parquetrrG)"rKrLrr2rGrdZmake_write_optionsrselectrir7Z write_datasetr r!rxr+Z to_pandasZdroprsnamesrJrIgroupbyr$tuplerbziprrZ from_pandasrrWrZ set_file_path)r root_pathZpartition_colsryrr?rrHrirprrriZ write_optionsr7Z part_schemarrZdata_dfZ data_colsZ subschemar9rZsubgroupsubdirZsubtableoutfile relative_pathrr.rrzrwrite_to_dataset%s0                   rcKsHt||f|}||dk rDt|}|D]}||q*||dS)a Write metadata-only Parquet file from schema. This can be used with `write_to_dataset` to generate `_common_metadata` and `_metadata` sidecar files. Parameters ---------- schema : pyarrow.Schema where: string or pyarrow.NativeFile metadata_collector: **kwargs : dict, Additional kwargs for ParquetWriter class. See docstring for `ParquetWriter` for more information. Examples -------- Write a dataset and collect metadata information. >>> metadata_collector = [] >>> write_to_dataset( ... table, root_path, ... metadata_collector=metadata_collector, **writer_kwargs) Write the `_common_metadata` parquet file without row groups statistics. >>> write_metadata( ... table.schema, root_path / '_common_metadata', **writer_kwargs) Write the `_metadata` parquet file with row groups statistics. >>> write_metadata( ... table.schema, root_path / '_metadata', ... metadata_collector=metadata_collector, **writer_kwargs) N)rrrBZappend_row_groupsZwrite_metadata_file)rirrrrrUmrrrwrite_metadatas$ rcCst||djS)a# Read FileMetadata from footer of a single Parquet file. Parameters ---------- where : str (filepath) or file-like object memory_map : bool, default False Create memory map when the source is a file path. Returns ------- metadata : FileMetadata r@)rRrUrr]rrrrBsrBcCst||djS)a# Read effective Arrow schema from Parquet file metadata. Parameters ---------- where : str (filepath) or file-like object memory_map : bool, default False Create memory map when the source is a file path. Returns ------- schema : pyarrow.Schema r@)rRrirHrrrr read_schemasr)T)N)rr_N) NTNFFNNNrrFN)NTFNNrTN)NrTrTNNFNNNNFr)NNNT)N)F)F)S collectionsrZ concurrentr functoolsrrrcollections.abcrnumpyrr.rerN urllib.parserZpyarrowrZ pyarrow.librZpyarrow._parquetrrrr r r r r Z pyarrow.fsrrrrrr Z pyarrow.utilrrrrrr"r*r;rWrQrRcompilerrrrrrrrrrrr#rr"rr2r5rVr6rAr:Z_read_table_docstringrnrGrbrrOrrxrrrBrrrrrs     $    6z  Bt  ;hk   &!2 D   )    0