// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // This API is EXPERIMENTAL. #pragma once #include #include #include #include #include #include #include "arrow/dataset/expression.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" #include "arrow/util/optional.h" namespace arrow { namespace dataset { // ---------------------------------------------------------------------- // Partitioning /// \brief Interface for parsing partition expressions from string partition /// identifiers. /// /// For example, the identifier "foo=5" might be parsed to an equality expression /// between the "foo" field and the value 5. /// /// Some partitionings may store the field names in a metadata /// store instead of in file paths, for example /// dataset_root/2009/11/... could be used when the partition fields /// are "year" and "month" /// /// Paths are consumed from left to right. Paths must be relative to /// the root of a partition; path prefixes must be removed before passing /// the path to a partitioning for parsing. class ARROW_DS_EXPORT Partitioning { public: virtual ~Partitioning() = default; /// \brief The name identifying the kind of partitioning virtual std::string type_name() const = 0; /// \brief If the input batch shares any fields with this partitioning, /// produce sub-batches which satisfy mutually exclusive Expressions. struct PartitionedBatches { RecordBatchVector batches; std::vector expressions; }; virtual Result Partition( const std::shared_ptr& batch) const = 0; /// \brief Parse a path into a partition expression virtual Result Parse(const std::string& path) const = 0; virtual Result Format(const Expression& expr) const = 0; /// \brief A default Partitioning which always yields scalar(true) static std::shared_ptr Default(); const std::shared_ptr& schema() { return schema_; } protected: explicit Partitioning(std::shared_ptr schema) : schema_(std::move(schema)) {} std::shared_ptr schema_; }; struct PartitioningFactoryOptions { /// When inferring a schema for partition fields, yield dictionary encoded types /// instead of plain. This can be more efficient when materializing virtual /// columns, and Expressions parsed by the finished Partitioning will include /// dictionaries of all unique inspected values for each field. bool infer_dictionary = false; }; /// \brief PartitioningFactory provides creation of a partitioning when the /// specific schema must be inferred from available paths (no explicit schema is known). class ARROW_DS_EXPORT PartitioningFactory { public: virtual ~PartitioningFactory() = default; /// \brief The name identifying the kind of partitioning virtual std::string type_name() const = 0; /// Get the schema for the resulting Partitioning. /// This may reset internal state, for example dictionaries of unique representations. virtual Result> Inspect( const std::vector& paths) = 0; /// Create a partitioning using the provided schema /// (fields may be dropped). virtual Result> Finish( const std::shared_ptr& schema) const = 0; }; /// \brief Subclass for the common case of a partitioning which yields an equality /// expression for each segment class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning { public: /// An unconverted equality expression consisting of a field name and the representation /// of a scalar value struct Key { std::string name, value; }; static Status SetDefaultValuesFromKeys(const Expression& expr, RecordBatchProjector* projector); Result Partition( const std::shared_ptr& batch) const override; Result Parse(const std::string& path) const override; Result Format(const Expression& expr) const override; protected: KeyValuePartitioning(std::shared_ptr schema, ArrayVector dictionaries) : Partitioning(std::move(schema)), dictionaries_(std::move(dictionaries)) { if (dictionaries_.empty()) { dictionaries_.resize(schema_->num_fields()); } } virtual std::vector ParseKeys(const std::string& path) const = 0; virtual Result FormatValues(const ScalarVector& values) const = 0; /// Convert a Key to a full expression. Result ConvertKey(const Key& key) const; ArrayVector dictionaries_; }; /// \brief DirectoryPartitioning parses one segment of a path for each field in its /// schema. All fields are required, so paths passed to DirectoryPartitioning::Parse /// must contain segments for each field. /// /// For example given schema the path "/2009/11" would be /// parsed to ("year"_ == 2009 and "month"_ == 11) class ARROW_DS_EXPORT DirectoryPartitioning : public KeyValuePartitioning { public: // If a field in schema is of dictionary type, the corresponding element of dictionaries // must be contain the dictionary of values for that field. explicit DirectoryPartitioning(std::shared_ptr schema, ArrayVector dictionaries = {}) : KeyValuePartitioning(std::move(schema), std::move(dictionaries)) {} std::string type_name() const override { return "schema"; } static std::shared_ptr MakeFactory( std::vector field_names, PartitioningFactoryOptions = {}); private: std::vector ParseKeys(const std::string& path) const override; Result FormatValues(const ScalarVector& values) const override; }; /// \brief Multi-level, directory based partitioning /// originating from Apache Hive with all data files stored in the /// leaf directories. Data is partitioned by static values of a /// particular column in the schema. Partition keys are represented in /// the form $key=$value in directory names. /// Field order is ignored, as are missing or unrecognized field names. /// /// For example given schema the path /// "/day=321/ignored=3.4/year=2009" parses to ("year"_ == 2009 and "day"_ == 321) class ARROW_DS_EXPORT HivePartitioning : public KeyValuePartitioning { public: // If a field in schema is of dictionary type, the corresponding element of dictionaries // must be contain the dictionary of values for that field. explicit HivePartitioning(std::shared_ptr schema, ArrayVector dictionaries = {}) : KeyValuePartitioning(std::move(schema), std::move(dictionaries)) {} std::string type_name() const override { return "hive"; } static util::optional ParseKey(const std::string& segment); static std::shared_ptr MakeFactory( PartitioningFactoryOptions = {}); private: std::vector ParseKeys(const std::string& path) const override; Result FormatValues(const ScalarVector& values) const override; }; /// \brief Implementation provided by lambda or other callable class ARROW_DS_EXPORT FunctionPartitioning : public Partitioning { public: using ParseImpl = std::function(const std::string&)>; using FormatImpl = std::function(const Expression&)>; FunctionPartitioning(std::shared_ptr schema, ParseImpl parse_impl, FormatImpl format_impl = NULLPTR, std::string name = "function") : Partitioning(std::move(schema)), parse_impl_(std::move(parse_impl)), format_impl_(std::move(format_impl)), name_(std::move(name)) {} std::string type_name() const override { return name_; } Result Parse(const std::string& path) const override { return parse_impl_(path); } Result Format(const Expression& expr) const override { if (format_impl_) { return format_impl_(expr); } return Status::NotImplemented("formatting paths from ", type_name(), " Partitioning"); } Result Partition( const std::shared_ptr& batch) const override { return Status::NotImplemented("partitioning batches from ", type_name(), " Partitioning"); } private: ParseImpl parse_impl_; FormatImpl format_impl_; std::string name_; }; /// \brief Remove a prefix and the filename of a path. /// /// e.g., `StripPrefixAndFilename("/data/year=2019/c.txt", "/data") -> "year=2019"` ARROW_DS_EXPORT std::string StripPrefixAndFilename(const std::string& path, const std::string& prefix); /// \brief Vector version of StripPrefixAndFilename. ARROW_DS_EXPORT std::vector StripPrefixAndFilename( const std::vector& paths, const std::string& prefix); /// \brief Vector version of StripPrefixAndFilename. ARROW_DS_EXPORT std::vector StripPrefixAndFilename( const std::vector& files, const std::string& prefix); /// \brief Either a Partitioning or a PartitioningFactory class ARROW_DS_EXPORT PartitioningOrFactory { public: explicit PartitioningOrFactory(std::shared_ptr partitioning) : partitioning_(std::move(partitioning)) {} explicit PartitioningOrFactory(std::shared_ptr factory) : factory_(std::move(factory)) {} PartitioningOrFactory& operator=(std::shared_ptr partitioning) { return *this = PartitioningOrFactory(std::move(partitioning)); } PartitioningOrFactory& operator=(std::shared_ptr factory) { return *this = PartitioningOrFactory(std::move(factory)); } const std::shared_ptr& partitioning() const { return partitioning_; } const std::shared_ptr& factory() const { return factory_; } Result> GetOrInferSchema(const std::vector& paths); private: std::shared_ptr factory_; std::shared_ptr partitioning_; }; /// \brief Assemble lists of indices of identical rows. /// /// \param[in] by A StructArray whose columns will be used as grouping criteria. /// Top level nulls are invalid, as are empty criteria (no grouping /// columns). /// \return A array of type `struct>`, /// which is a mapping from unique rows (field "values") to lists of /// indices into `by` where that row appears (field "groupings"). /// /// For example, /// MakeGroupings([ /// {"a": "ex", "b": 0}, /// {"a": "ex", "b": 0}, /// {"a": "why", "b": 0}, /// {"a": "why", "b": 0}, /// {"a": "ex", "b": 0}, /// {"a": "why", "b": 1} /// ]) == [ /// {"values": {"a": "ex", "b": 0}, "groupings": [0, 1, 4]}, /// {"values": {"a": "why", "b": 0}, "groupings": [2, 3]}, /// {"values": {"a": "why", "b": 1}, "groupings": [5]} /// ] ARROW_DS_EXPORT Result> MakeGroupings(const StructArray& by); /// \brief Produce a ListArray whose slots are selections of `array` which correspond to /// the provided groupings. /// /// For example, /// ApplyGroupings([[0, 1, 4], [2, 3], [5]], [ /// {"a": "ex", "b": 0, "passenger": 0}, /// {"a": "ex", "b": 0, "passenger": 1}, /// {"a": "why", "b": 0, "passenger": 2}, /// {"a": "why", "b": 0, "passenger": 3}, /// {"a": "ex", "b": 0, "passenger": 4}, /// {"a": "why", "b": 1, "passenger": 5} /// ]) == [ /// [ /// {"a": "ex", "b": 0, "passenger": 0}, /// {"a": "ex", "b": 0, "passenger": 1}, /// {"a": "ex", "b": 0, "passenger": 4}, /// ], /// [ /// {"a": "why", "b": 0, "passenger": 2}, /// {"a": "why", "b": 0, "passenger": 3}, /// ], /// [ /// {"a": "why", "b": 1, "passenger": 5} /// ] /// ] ARROW_DS_EXPORT Result> ApplyGroupings(const ListArray& groupings, const Array& array); /// \brief Produce selections of a RecordBatch which correspond to the provided groupings. ARROW_DS_EXPORT Result ApplyGroupings(const ListArray& groupings, const std::shared_ptr& batch); } // namespace dataset } // namespace arrow