// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include #include #include #include "arrow/array.h" #include "arrow/chunked_array.h" #include "arrow/status.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/checked_cast.h" #include "arrow/util/make_unique.h" #include "arrow/visitor_inline.h" namespace arrow { namespace internal { template class ConverterTrait> static Result> MakeConverter( std::shared_ptr type, typename BaseConverter::OptionsType options, MemoryPool* pool); template class Converter { public: using Self = Converter; using InputType = Input; using OptionsType = Options; virtual ~Converter() = default; Status Construct(std::shared_ptr type, OptionsType options, MemoryPool* pool) { type_ = std::move(type); options_ = std::move(options); return Init(pool); } virtual Status Append(InputType value) = 0; const std::shared_ptr& builder() const { return builder_; } const std::shared_ptr& type() const { return type_; } OptionsType options() const { return options_; } bool may_overflow() const { return may_overflow_; } virtual Status Reserve(int64_t additional_capacity) { return builder_->Reserve(additional_capacity); } Status AppendNull() { return builder_->AppendNull(); } virtual Result> ToArray() { return builder_->Finish(); } virtual Result> ToArray(int64_t length) { ARROW_ASSIGN_OR_RAISE(auto arr, this->ToArray()); return arr->Slice(0, length); } virtual Result> ToChunkedArray() { ARROW_ASSIGN_OR_RAISE(auto array, ToArray()); std::vector> chunks = {std::move(array)}; return std::make_shared(chunks); } protected: virtual Status Init(MemoryPool* pool) { return Status::OK(); } std::shared_ptr type_; std::shared_ptr builder_; OptionsType options_; bool may_overflow_ = false; }; template class PrimitiveConverter : public BaseConverter { public: using BuilderType = typename TypeTraits::BuilderType; protected: Status Init(MemoryPool* pool) override { this->builder_ = std::make_shared(this->type_, pool); // Narrow variable-sized binary types may overflow this->may_overflow_ = is_binary_like(this->type_->id()); primitive_type_ = checked_cast(this->type_.get()); primitive_builder_ = checked_cast(this->builder_.get()); return Status::OK(); } const ArrowType* primitive_type_; BuilderType* primitive_builder_; }; template class ConverterTrait> class ListConverter : public BaseConverter { public: using BuilderType = typename TypeTraits::BuilderType; using ConverterType = typename ConverterTrait::type; protected: Status Init(MemoryPool* pool) override { list_type_ = checked_cast(this->type_.get()); ARROW_ASSIGN_OR_RAISE(value_converter_, (MakeConverter( list_type_->value_type(), this->options_, pool))); this->builder_ = std::make_shared(pool, value_converter_->builder(), this->type_); list_builder_ = checked_cast(this->builder_.get()); // Narrow list types may overflow this->may_overflow_ = sizeof(typename ArrowType::offset_type) < sizeof(int64_t); return Status::OK(); } const ArrowType* list_type_; BuilderType* list_builder_; std::unique_ptr value_converter_; }; template class ConverterTrait> class StructConverter : public BaseConverter { public: using ConverterType = typename ConverterTrait::type; Status Reserve(int64_t additional_capacity) override { ARROW_RETURN_NOT_OK(this->builder_->Reserve(additional_capacity)); for (const auto& child : children_) { ARROW_RETURN_NOT_OK(child->Reserve(additional_capacity)); } return Status::OK(); } protected: Status Init(MemoryPool* pool) override { std::unique_ptr child_converter; std::vector> child_builders; struct_type_ = checked_cast(this->type_.get()); for (const auto& field : struct_type_->fields()) { ARROW_ASSIGN_OR_RAISE(child_converter, (MakeConverter( field->type(), this->options_, pool))); this->may_overflow_ |= child_converter->may_overflow(); child_builders.push_back(child_converter->builder()); children_.push_back(std::move(child_converter)); } this->builder_ = std::make_shared(this->type_, pool, std::move(child_builders)); struct_builder_ = checked_cast(this->builder_.get()); return Status::OK(); } const StructType* struct_type_; StructBuilder* struct_builder_; std::vector> children_; }; template class DictionaryConverter : public BaseConverter { public: using BuilderType = DictionaryBuilder; protected: Status Init(MemoryPool* pool) override { std::unique_ptr builder; ARROW_RETURN_NOT_OK(MakeDictionaryBuilder(pool, this->type_, NULLPTR, &builder)); this->builder_ = std::move(builder); this->may_overflow_ = false; dict_type_ = checked_cast(this->type_.get()); value_type_ = checked_cast(dict_type_->value_type().get()); value_builder_ = checked_cast(this->builder_.get()); return Status::OK(); } const DictionaryType* dict_type_; const ValueType* value_type_; BuilderType* value_builder_; }; template class ConverterTrait> struct MakeConverterImpl { template ::type> Status Visit(const T&) { out.reset(new ConverterType()); return out->Construct(std::move(type), std::move(options), pool); } Status Visit(const DictionaryType& t) { switch (t.value_type()->id()) { #define DICTIONARY_CASE(TYPE) \ case TYPE::type_id: \ out = internal::make_unique< \ typename ConverterTrait::template dictionary_type>(); \ break; DICTIONARY_CASE(BooleanType); DICTIONARY_CASE(Int8Type); DICTIONARY_CASE(Int16Type); DICTIONARY_CASE(Int32Type); DICTIONARY_CASE(Int64Type); DICTIONARY_CASE(UInt8Type); DICTIONARY_CASE(UInt16Type); DICTIONARY_CASE(UInt32Type); DICTIONARY_CASE(UInt64Type); DICTIONARY_CASE(FloatType); DICTIONARY_CASE(DoubleType); DICTIONARY_CASE(BinaryType); DICTIONARY_CASE(StringType); DICTIONARY_CASE(FixedSizeBinaryType); #undef DICTIONARY_CASE default: return Status::NotImplemented("DictionaryArray converter for type ", t.ToString(), " not implemented"); } return out->Construct(std::move(type), std::move(options), pool); } Status Visit(const DataType& t) { return Status::NotImplemented(t.name()); } std::shared_ptr type; typename BaseConverter::OptionsType options; MemoryPool* pool; std::unique_ptr out; }; template class ConverterTrait> static Result> MakeConverter( std::shared_ptr type, typename BaseConverter::OptionsType options, MemoryPool* pool) { MakeConverterImpl visitor{ std::move(type), std::move(options), pool, NULLPTR}; ARROW_RETURN_NOT_OK(VisitTypeInline(*visitor.type, &visitor)); return std::move(visitor.out); } template class Chunker { public: using InputType = typename Converter::InputType; explicit Chunker(std::unique_ptr converter) : converter_(std::move(converter)) {} Status Reserve(int64_t additional_capacity) { ARROW_RETURN_NOT_OK(converter_->Reserve(additional_capacity)); reserved_ += additional_capacity; return Status::OK(); } Status AppendNull() { auto status = converter_->AppendNull(); if (ARROW_PREDICT_FALSE(status.IsCapacityError())) { if (converter_->builder()->length() == 0) { // Builder length == 0 means the individual element is too large to append. // In this case, no need to try again. return status; } ARROW_RETURN_NOT_OK(FinishChunk()); return converter_->AppendNull(); } ++length_; return status; } Status Append(InputType value) { auto status = converter_->Append(value); if (ARROW_PREDICT_FALSE(status.IsCapacityError())) { if (converter_->builder()->length() == 0) { return status; } ARROW_RETURN_NOT_OK(FinishChunk()); return Append(value); } ++length_; return status; } Status FinishChunk() { ARROW_ASSIGN_OR_RAISE(auto chunk, converter_->ToArray(length_)); chunks_.push_back(chunk); // Reserve space for the remaining items. // Besides being an optimization, it is also required if the converter's // implementation relies on unsafe builder methods in converter->Append(). auto remaining = reserved_ - length_; Reset(); return Reserve(remaining); } Result> ToChunkedArray() { ARROW_RETURN_NOT_OK(FinishChunk()); return std::make_shared(chunks_); } protected: void Reset() { converter_->builder()->Reset(); length_ = 0; reserved_ = 0; } int64_t length_ = 0; int64_t reserved_ = 0; std::unique_ptr converter_; std::vector> chunks_; }; template static Result>> MakeChunker(std::unique_ptr converter) { return internal::make_unique>(std::move(converter)); } } // namespace internal } // namespace arrow