from __future__ import print_function from copy import copy import json import re import struct import warnings import numba import numpy as np import pandas as pd from six import integer_types from fastparquet.util import join_path, PANDAS_VERSION from .thrift_structures import write_thrift try: from pandas.api.types import is_categorical_dtype except ImportError: # Pandas <= 0.18.1 from pandas.core.common import is_categorical_dtype from .thrift_structures import parquet_thrift from .compression import compress_data from .converted_types import tobson from . import encoding, api, __version__ from .util import (default_open, default_mkdirs, PY2, STR_TYPE, check_column_names, metadata_from_many, created_by, get_column_metadata) from .speedups import array_encode_utf8, pack_byte_array MARKER = b'PAR1' NaT = np.timedelta64(None).tobytes() # require numpy version >= 1.7 nat = np.datetime64('NaT').view('int64') typemap = { # primitive type, converted type, bit width 'bool': (parquet_thrift.Type.BOOLEAN, None, 1), 'int32': (parquet_thrift.Type.INT32, None, 32), 'int64': (parquet_thrift.Type.INT64, None, 64), 'int8': (parquet_thrift.Type.INT32, parquet_thrift.ConvertedType.INT_8, 8), 'int16': (parquet_thrift.Type.INT32, parquet_thrift.ConvertedType.INT_16, 16), 'uint8': (parquet_thrift.Type.INT32, parquet_thrift.ConvertedType.UINT_8, 8), 'uint16': (parquet_thrift.Type.INT32, parquet_thrift.ConvertedType.UINT_16, 16), 'uint32': (parquet_thrift.Type.INT32, parquet_thrift.ConvertedType.UINT_32, 32), 'uint64': (parquet_thrift.Type.INT64, parquet_thrift.ConvertedType.UINT_64, 64), 'float32': (parquet_thrift.Type.FLOAT, None, 32), 'float64': (parquet_thrift.Type.DOUBLE, None, 64), 'float16': (parquet_thrift.Type.FLOAT, None, 16), } revmap = {parquet_thrift.Type.INT32: np.int32, parquet_thrift.Type.INT64: np.int64, parquet_thrift.Type.FLOAT: np.float32, parquet_thrift.Type.DOUBLE: np.float64} def find_type(data, fixed_text=None, object_encoding=None, times='int64'): """ Get appropriate typecodes for column dtype Data conversion do not happen here, see convert(). The user is expected to transform their data into the appropriate dtype before saving to parquet, we will not make any assumptions for them. Known types that cannot be represented (must be first converted another type or to raw binary): float128, complex Parameters ---------- data: pd.Series fixed_text: int or None For str and bytes, the fixed-string length to use. If None, object column will remain variable length. object_encoding: None or infer|bytes|utf8|json|bson|bool|int|int32|float How to encode object type into bytes. If None, bytes is assumed; if 'infer', type is guessed from 10 first non-null values. times: 'int64'|'int96' Normal integers or 12-byte encoding for timestamps. Returns ------- - a thrift schema element - a thrift typecode to be passed to the column chunk writer - converted data (None if convert is False) """ dtype = data.dtype if dtype.name in typemap: type, converted_type, width = typemap[dtype.name] elif "S" in str(dtype)[:2] or "U" in str(dtype)[:2]: type, converted_type, width = (parquet_thrift.Type.FIXED_LEN_BYTE_ARRAY, None, dtype.itemsize) elif dtype == "O": if object_encoding == 'infer': object_encoding = infer_object_encoding(data) if object_encoding == 'utf8': type, converted_type, width = (parquet_thrift.Type.BYTE_ARRAY, parquet_thrift.ConvertedType.UTF8, None) elif object_encoding in ['bytes', None]: type, converted_type, width = (parquet_thrift.Type.BYTE_ARRAY, None, None) elif object_encoding == 'json': type, converted_type, width = (parquet_thrift.Type.BYTE_ARRAY, parquet_thrift.ConvertedType.JSON, None) elif object_encoding == 'bson': type, converted_type, width = (parquet_thrift.Type.BYTE_ARRAY, parquet_thrift.ConvertedType.BSON, None) elif object_encoding == 'bool': type, converted_type, width = (parquet_thrift.Type.BOOLEAN, None, 1) elif object_encoding == 'int': type, converted_type, width = (parquet_thrift.Type.INT64, None, 64) elif object_encoding == 'int32': type, converted_type, width = (parquet_thrift.Type.INT32, None, 32) elif object_encoding == 'float': type, converted_type, width = (parquet_thrift.Type.DOUBLE, None, 64) else: raise ValueError('Object encoding (%s) not one of ' 'infer|utf8|bytes|json|bson|bool|int|int32|float' % object_encoding) if fixed_text: width = fixed_text type = parquet_thrift.Type.FIXED_LEN_BYTE_ARRAY elif dtype.kind == "M": if times == 'int64': type, converted_type, width = ( parquet_thrift.Type.INT64, parquet_thrift.ConvertedType.TIMESTAMP_MICROS, None) elif times == 'int96': type, converted_type, width = (parquet_thrift.Type.INT96, None, None) else: raise ValueError( "Parameter times must be [int64|int96], not %s" % times) if hasattr(dtype, 'tz') and str(dtype.tz) != 'UTC': warnings.warn('Coercing datetimes to UTC') elif dtype.kind == "m": type, converted_type, width = (parquet_thrift.Type.INT64, parquet_thrift.ConvertedType.TIME_MICROS, None) else: raise ValueError("Don't know how to convert data type: %s" % dtype) se = parquet_thrift.SchemaElement( name=data.name, type_length=width, converted_type=converted_type, type=type, repetition_type=parquet_thrift.FieldRepetitionType.REQUIRED) return se, type def convert(data, se): """Convert data according to the schema encoding""" dtype = data.dtype type = se.type converted_type = se.converted_type if dtype.name in typemap: if type in revmap: out = data.values.astype(revmap[type], copy=False) elif type == parquet_thrift.Type.BOOLEAN: padded = np.lib.pad(data.values, (0, 8 - (len(data) % 8)), 'constant', constant_values=(0, 0)) out = np.packbits(padded.reshape(-1, 8)[:, ::-1].ravel()) elif dtype.name in typemap: out = data.values elif "S" in str(dtype)[:2] or "U" in str(dtype)[:2]: out = data.values elif dtype == "O": try: if converted_type == parquet_thrift.ConvertedType.UTF8: out = array_encode_utf8(data) elif converted_type is None: if type in revmap: out = data.values.astype(revmap[type], copy=False) elif type == parquet_thrift.Type.BOOLEAN: padded = np.lib.pad(data.values, (0, 8 - (len(data) % 8)), 'constant', constant_values=(0, 0)) out = np.packbits(padded.reshape(-1, 8)[:, ::-1].ravel()) else: out = data.values elif converted_type == parquet_thrift.ConvertedType.JSON: out = np.array([json.dumps(x).encode('utf8') for x in data], dtype="O") elif converted_type == parquet_thrift.ConvertedType.BSON: out = data.map(tobson).values if type == parquet_thrift.Type.FIXED_LEN_BYTE_ARRAY: out = out.astype('S%i' % se.type_length) except Exception as e: ct = parquet_thrift.ConvertedType._VALUES_TO_NAMES[ converted_type] if converted_type is not None else None raise ValueError('Error converting column "%s" to bytes using ' 'encoding %s. Original error: ' '%s' % (data.name, ct, e)) elif converted_type == parquet_thrift.ConvertedType.TIMESTAMP_MICROS: out = np.empty(len(data), 'int64') time_shift(data.values.view('int64'), out) elif converted_type == parquet_thrift.ConvertedType.TIME_MICROS: out = np.empty(len(data), 'int64') time_shift(data.values.view('int64'), out) elif type == parquet_thrift.Type.INT96 and dtype.kind == 'M': ns_per_day = (24 * 3600 * 1000000000) day = data.values.view('int64') // ns_per_day + 2440588 ns = (data.values.view('int64') % ns_per_day)# - ns_per_day // 2 out = np.empty(len(data), dtype=[('ns', 'i8'), ('day', 'i4')]) out['ns'] = ns out['day'] = day else: raise ValueError("Don't know how to convert data type: %s" % dtype) return out def infer_object_encoding(data): head = data[:10] if isinstance(data, pd.Index) else data.dropna()[:10] if all(isinstance(i, STR_TYPE) for i in head) and not PY2: return "utf8" elif PY2 and all(isinstance(i, unicode) for i in head): return "utf8" elif PY2 and all(isinstance(i, (str, bytes)) for i in head): return "bytes" elif all(isinstance(i, bytes) for i in head): return 'bytes' elif all(isinstance(i, (list, dict)) for i in head): return 'json' elif all(isinstance(i, bool) for i in head): return 'bool' elif all(isinstance(i, integer_types) for i in head): return 'int' elif all(isinstance(i, float) or isinstance(i, np.floating) for i in head): # You need np.floating here for pandas NaNs in object # columns with python floats. return 'float' else: raise ValueError("Can't infer object conversion type: %s" % head) @numba.njit(nogil=True) def time_shift(indata, outdata, factor=1000): # pragma: no cover for i in range(len(indata)): if indata[i] == nat: outdata[i] = nat else: outdata[i] = indata[i] // factor def encode_plain(data, se): """PLAIN encoding; returns byte representation""" out = convert(data, se) if se.type == parquet_thrift.Type.BYTE_ARRAY: return pack_byte_array(list(out)) else: return out.tobytes() @numba.njit(nogil=True) def encode_unsigned_varint(x, o): # pragma: no cover while x > 127: o.write_byte((x & 0x7F) | 0x80) x >>= 7 o.write_byte(x) @numba.jit(nogil=True) def zigzag(n): # pragma: no cover " 32-bit only " return (n << 1) ^ (n >> 31) @numba.njit(nogil=True) def encode_bitpacked_inv(values, width, o): # pragma: no cover bit = 16 - width right_byte_mask = 0b11111111 left_byte_mask = right_byte_mask << 8 bits = 0 for v in values: bits |= v << bit while bit <= 8: o.write_byte((bits & left_byte_mask) >> 8) bit += 8 bits = (bits & right_byte_mask) << 8 bit -= width if bit: o.write_byte((bits & left_byte_mask) >> 8) @numba.njit(nogil=True) def encode_bitpacked(values, width, o): # pragma: no cover """ Write values packed into width-bits each (which can be >8) values is a NumbaIO array (int32) o is a NumbaIO output array (uint8), size=(len(values)*width)/8, rounded up. """ bit_packed_count = (len(values) + 7) // 8 encode_unsigned_varint(bit_packed_count << 1 | 1, o) # write run header bit = 0 right_byte_mask = 0b11111111 bits = 0 for v in values: bits |= v << bit bit += width while bit >= 8: o.write_byte(bits & right_byte_mask) bit -= 8 bits >>= 8 if bit: o.write_byte(bits) def write_length(l, o): """ Put a 32-bit length into four bytes in o Equivalent to struct.pack('>= 8 def encode_rle_bp(data, width, o, withlength=False): """Write data into o using RLE/bitpacked hybrid data : values to encode (int32) width : bits-per-value, set by max(data) o : output encoding.Numpy8 withlength : bool If definitions/repetitions, length of data must be pre-written """ if withlength: start = o.loc o.loc = o.loc + 4 if True: # I don't know how one would choose between RLE and bitpack encode_bitpacked(data, width, o) if withlength: end = o.loc o.loc = start write_length(end - start, o) o.loc = end def encode_rle(data, se, fixed_text=None): if data.dtype.kind not in ['i', 'u']: raise ValueError('RLE/bitpack encoding only works for integers') if se.type_length in [8, 16]: o = encoding.Numpy8(np.empty(10, dtype=np.uint8)) bit_packed_count = (len(data) + 7) // 8 encode_unsigned_varint(bit_packed_count << 1 | 1, o) # write run header return o.so_far().tostring() + data.values.tostring() else: m = data.max() width = 0 while m: m >>= 1 width += 1 l = (len(data) * width + 7) // 8 + 10 o = encoding.Numpy8(np.empty(l, dtype='uint8')) encode_rle_bp(data, width, o) return o.so_far().tostring() def encode_dict(data, se): """ The data part of dictionary encoding is always int8, with RLE/bitpack """ width = data.values.dtype.itemsize * 8 o = encoding.Numpy8(np.empty(10, dtype=np.uint8)) o.write_byte(width) bit_packed_count = (len(data) + 7) // 8 encode_unsigned_varint(bit_packed_count << 1 | 1, o) # write run header return o.so_far().tostring() + data.values.tostring() encode = { 'PLAIN': encode_plain, 'RLE': encode_rle, 'PLAIN_DICTIONARY': encode_dict, # 'DELTA_BINARY_PACKED': encode_delta } def make_definitions(data, no_nulls): """For data that can contain NULLs, produce definition levels binary data: either bitpacked bools, or (if number of nulls == 0), single RLE block.""" temp = encoding.Numpy8(np.empty(10, dtype=np.uint8)) if no_nulls: # no nulls at all l = len(data) encode_unsigned_varint(l << 1, temp) temp.write_byte(1) block = struct.pack('= "0.25.0": start = index_cols.start stop = index_cols.stop step = index_cols.step else: start = index_cols._start stop = index_cols._stop step = index_cols._step index_cols = [{'name': index_cols.name, 'start': start, 'stop': stop, 'step': step, 'kind': 'range'}] pandas_metadata = {'index_columns': index_cols, 'columns': [], 'column_indexes': [{'name': data.columns.name, 'field_name': data.columns.name, 'pandas_type': 'mixed-integer', 'numpy_type': 'object', 'metadata': None}], 'creator': {'library': 'fastparquet', 'version': __version__}, 'pandas_version': pd.__version__,} root = parquet_thrift.SchemaElement(name='schema', num_children=0) meta = parquet_thrift.KeyValue() meta.key = 'pandas' fmd = parquet_thrift.FileMetaData(num_rows=len(data), schema=[root], version=1, created_by=created_by, row_groups=[], key_value_metadata=[meta]) object_encoding = object_encoding or {} for column in data.columns: if column in ignore_columns: continue pandas_metadata['columns'].append( get_column_metadata(data[column], column)) oencoding = (object_encoding if isinstance(object_encoding, STR_TYPE) else object_encoding.get(column, None)) fixed = None if fixed_text is None else fixed_text.get(column, None) if is_categorical_dtype(data[column].dtype): se, type = find_type(data[column].cat.categories, fixed_text=fixed, object_encoding=oencoding) se.name = column else: se, type = find_type(data[column], fixed_text=fixed, object_encoding=oencoding, times=times) col_has_nulls = has_nulls if has_nulls is None: se.repetition_type = data[column].dtype == "O" elif has_nulls is not True and has_nulls is not False: col_has_nulls = column in has_nulls if col_has_nulls: se.repetition_type = parquet_thrift.FieldRepetitionType.OPTIONAL fmd.schema.append(se) root.num_children += 1 meta.value = json.dumps(pandas_metadata, sort_keys=True) return fmd def write_simple(fn, data, fmd, row_group_offsets, compression, open_with, has_nulls, append=False): """ Write to one single file (for file_scheme='simple') """ if append: pf = api.ParquetFile(fn, open_with=open_with) if pf.file_scheme not in ['simple', 'empty']: raise ValueError('File scheme requested is simple, but ' 'existing file scheme is not') fmd = pf.fmd mode = 'rb+' else: mode = 'wb' with open_with(fn, mode) as f: if append: f.seek(-8, 2) head_size = struct.unpack('>> fastparquet.write('myfile.parquet', df) # doctest: +SKIP """ if str(has_nulls) == 'infer': has_nulls = None if isinstance(row_group_offsets, int): l = len(data) nparts = max((l - 1) // row_group_offsets + 1, 1) chunksize = max(min((l - 1) // nparts + 1, l), 1) row_group_offsets = list(range(0, l, chunksize)) if (write_index or write_index is None and not isinstance(data.index, pd.RangeIndex)): cols = set(data) data = data.reset_index() index_cols = [c for c in data if c not in cols] elif isinstance(data.index, pd.RangeIndex): # write_index=None, range to metadata index_cols = data.index else: # write_index=False index_cols = [] check_column_names(data.columns, partition_on, fixed_text, object_encoding, has_nulls) ignore = partition_on if file_scheme != 'simple' else [] fmd = make_metadata(data, has_nulls=has_nulls, ignore_columns=ignore, fixed_text=fixed_text, object_encoding=object_encoding, times=times, index_cols=index_cols) if file_scheme == 'simple': write_simple(filename, data, fmd, row_group_offsets, compression, open_with, has_nulls, append) elif file_scheme in ['hive', 'drill']: if append: pf = api.ParquetFile(filename, open_with=open_with) if pf.file_scheme not in ['hive', 'empty', 'flat']: raise ValueError('Requested file scheme is %s, but ' 'existing file scheme is not.' % file_scheme) fmd = pf.fmd i_offset = find_max_part(fmd.row_groups) if tuple(partition_on) != tuple(pf.cats): raise ValueError('When appending, partitioning columns must' ' match existing data') else: i_offset = 0 fn = join_path(filename, '_metadata') mkdirs(filename) for i, start in enumerate(row_group_offsets): end = (row_group_offsets[i+1] if i < (len(row_group_offsets) - 1) else None) part = 'part.%i.parquet' % (i + i_offset) if partition_on: rgs = partition_on_columns( data[start:end], partition_on, filename, part, fmd, compression, open_with, mkdirs, with_field=file_scheme == 'hive' ) fmd.row_groups.extend(rgs) else: partname = join_path(filename, part) with open_with(partname, 'wb') as f2: rg = make_part_file(f2, data[start:end], fmd.schema, compression=compression, fmd=fmd) for chunk in rg.columns: chunk.file_path = part fmd.row_groups.append(rg) write_common_metadata(fn, fmd, open_with, no_row_groups=False) write_common_metadata(join_path(filename, '_common_metadata'), fmd, open_with) else: raise ValueError('File scheme should be simple|hive, not', file_scheme) def find_max_part(row_groups): """ Find the highest integer matching "**part.*.parquet" in referenced paths. """ paths = [c.file_path or "" for rg in row_groups for c in rg.columns] s = re.compile(r'.*part.(?P[\d]+).parquet$') matches = [s.match(path) for path in paths] nums = [int(match.groupdict()['i']) for match in matches if match] if nums: return max(nums) + 1 else: return 0 def partition_on_columns(data, columns, root_path, partname, fmd, compression, open_with, mkdirs, with_field=True): """ Split each row-group by the given columns Each combination of column values (determined by pandas groupby) will be written in structured directories. """ gb = data.groupby(columns) remaining = list(data) for column in columns: remaining.remove(column) if not remaining: raise ValueError("Cannot include all columns in partition_on") rgs = [] for key, group in zip(sorted(gb.indices), sorted(gb)): if group[1].empty: continue df = group[1][remaining] if not isinstance(key, tuple): key = (key,) if with_field: path = join_path(*( "%s=%s" % (name, val) for name, val in zip(columns, key) )) else: path = join_path(*("%s" % val for val in key)) relname = join_path(path, partname) mkdirs(join_path(root_path, path)) fullname = join_path(root_path, path, partname) with open_with(fullname, 'wb') as f2: rg = make_part_file(f2, df, fmd.schema, compression=compression, fmd=fmd) if rg is not None: for chunk in rg.columns: chunk.file_path = relname rgs.append(rg) return rgs def write_common_metadata(fn, fmd, open_with=default_open, no_row_groups=True): """ For hive-style parquet, write schema in special shared file Parameters ---------- fn: str Filename to write to fmd: thrift FileMetaData Information to write open_with: func To use to create writable file as f(path, mode) no_row_groups: bool (True) Strip out row groups from metadata before writing - used for "common metadata" files, containing only the schema. """ consolidate_categories(fmd) with open_with(fn, 'wb') as f: f.write(MARKER) if no_row_groups: fmd = copy(fmd) fmd.row_groups = [] foot_size = write_thrift(f, fmd) else: foot_size = write_thrift(f, fmd) f.write(struct.pack(b" cat['metadata'][ 'num_categories']: cat['metadata']['num_categories'] = int(ncats[0]) key_value.value = json.dumps(meta, sort_keys=True) def merge(file_list, verify_schema=True, open_with=default_open, root=False): """ Create a logical data-set out of multiple parquet files. The files referenced in file_list must either be in the same directory, or at the same level within a structured directory, where the directories give partitioning information. The schemas of the files should also be consistent. Parameters ---------- file_list: list of paths or ParquetFile instances verify_schema: bool (True) If True, will first check that all the schemas in the input files are identical. open_with: func Used for opening a file for writing as f(path, mode). If input list is ParquetFile instances, will be inferred from the first one of these. root: str If passing a list of files, the top directory of the data-set may be ambiguous for partitioning where the upmost field has only one value. Use this to specify the data'set root directory, if required. Returns ------- ParquetFile instance corresponding to the merged data. """ basepath, fmd = metadata_from_many(file_list, verify_schema, open_with, root=root) out_file = join_path(basepath, '_metadata') write_common_metadata(out_file, fmd, open_with, no_row_groups=False) out = api.ParquetFile(out_file, open_with=open_with) out_file = join_path(basepath, '_common_metadata') write_common_metadata(out_file, fmd, open_with) return out