// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #pragma once #include #include #include #include #include #include #include #include "arrow/result.h" #include "arrow/status.h" #include "arrow/util/compare.h" #include "arrow/util/functional.h" #include "arrow/util/macros.h" #include "arrow/util/optional.h" #include "arrow/util/visibility.h" namespace arrow { template class Iterator; template struct IterationTraits { /// \brief a reserved value which indicates the end of iteration. By /// default this is NULLPTR since most iterators yield pointer types. /// Specialize IterationTraits if different end semantics are required. static T End() { return T(NULLPTR); } }; template struct IterationTraits> { /// \brief by default when iterating through a sequence of optional, /// nullopt indicates the end of iteration. /// Specialize IterationTraits if different end semantics are required. static util::optional End() { return util::nullopt; } // TODO(bkietz) The range-for loop over Iterator> yields // Result> which is unnecessary (since only the unyielded end optional // is nullopt. Add IterationTraits::GetRangeElement() to handle this case }; /// \brief A generic Iterator that can return errors template class Iterator : public util::EqualityComparable> { public: /// \brief Iterator may be constructed from any type which has a member function /// with signature Status Next(T*); /// /// The argument is moved or copied to the heap and kept in a unique_ptr. Only /// its destructor and its Next method (which are stored in function pointers) are /// referenced after construction. /// /// This approach is used to dodge MSVC linkage hell (ARROW-6244, ARROW-6558) when using /// an abstract template base class: instead of being inlined as usual for a template /// function the base's virtual destructor will be exported, leading to multiple /// definition errors when linking to any other TU where the base is instantiated. template explicit Iterator(Wrapped has_next) : ptr_(new Wrapped(std::move(has_next)), Delete), next_(Next) {} Iterator() : ptr_(NULLPTR, [](void*) {}) {} /// \brief Return the next element of the sequence, IterationTraits::End() when the /// iteration is completed. Calling this on a default constructed Iterator /// will result in undefined behavior. Result Next() { return next_(ptr_.get()); } /// Pass each element of the sequence to a visitor. Will return any error status /// returned by the visitor, terminating iteration. template Status Visit(Visitor&& visitor) { const auto end = IterationTraits::End(); for (;;) { ARROW_ASSIGN_OR_RAISE(auto value, Next()); if (value == end) break; ARROW_RETURN_NOT_OK(visitor(std::move(value))); } return Status::OK(); } /// Iterators will only compare equal if they are both null. /// Equality comparability is required to make an Iterator of Iterators /// (to check for the end condition). bool Equals(const Iterator& other) const { return ptr_ == other.ptr_; } explicit operator bool() const { return ptr_ != NULLPTR; } class RangeIterator { public: RangeIterator() : value_(IterationTraits::End()) {} explicit RangeIterator(Iterator i) : value_(IterationTraits::End()), iterator_(std::make_shared(std::move(i))) { Next(); } bool operator!=(const RangeIterator& other) const { return value_ != other.value_; } RangeIterator& operator++() { Next(); return *this; } Result operator*() { ARROW_RETURN_NOT_OK(value_.status()); auto value = std::move(value_); value_ = IterationTraits::End(); return value; } private: void Next() { if (!value_.ok()) { value_ = IterationTraits::End(); return; } value_ = iterator_->Next(); } Result value_; std::shared_ptr iterator_; }; RangeIterator begin() { return RangeIterator(std::move(*this)); } RangeIterator end() { return RangeIterator(); } /// \brief Move every element of this iterator into a vector. Result> ToVector() { std::vector out; for (auto maybe_element : *this) { ARROW_ASSIGN_OR_RAISE(auto element, maybe_element); out.push_back(std::move(element)); } // ARROW-8193: On gcc-4.8 without the explicit move it tries to use the // copy constructor, which may be deleted on the elements of type T return std::move(out); } private: /// Implementation of deleter for ptr_: Casts from void* to the wrapped type and /// deletes that. template static void Delete(void* ptr) { delete static_cast(ptr); } /// Implementation of Next: Casts from void* to the wrapped type and invokes that /// type's Next member function. template static Result Next(void* ptr) { return static_cast(ptr)->Next(); } /// ptr_ is a unique_ptr to void with a custom deleter: a function pointer which first /// casts from void* to a pointer to the wrapped type then deletes that. std::unique_ptr ptr_; /// next_ is a function pointer which first casts from void* to a pointer to the wrapped /// type then invokes its Next member function. Result (*next_)(void*) = NULLPTR; }; template struct IterationTraits> { // The end condition for an Iterator of Iterators is a default constructed (null) // Iterator. static Iterator End() { return Iterator(); } }; template class FunctionIterator { public: explicit FunctionIterator(Fn fn) : fn_(std::move(fn)) {} Result Next() { return fn_(); } private: Fn fn_; }; /// \brief Construct an Iterator which invokes a callable on Next() template ::ValueType> Iterator MakeFunctionIterator(Fn fn) { return Iterator(FunctionIterator(std::move(fn))); } template Iterator MakeEmptyIterator() { return MakeFunctionIterator([]() -> Result { return IterationTraits::End(); }); } template Iterator MakeErrorIterator(Status s) { return MakeFunctionIterator([s]() -> Result { ARROW_RETURN_NOT_OK(s); return IterationTraits::End(); }); } /// \brief Simple iterator which yields the elements of a std::vector template class VectorIterator { public: explicit VectorIterator(std::vector v) : elements_(std::move(v)) {} Result Next() { if (i_ == elements_.size()) { return IterationTraits::End(); } return std::move(elements_[i_++]); } private: std::vector elements_; size_t i_ = 0; }; template Iterator MakeVectorIterator(std::vector v) { return Iterator(VectorIterator(std::move(v))); } /// \brief Simple iterator which yields *pointers* to the elements of a std::vector. /// This is provided to support T where IterationTraits::End is not specialized template class VectorPointingIterator { public: explicit VectorPointingIterator(std::vector v) : elements_(std::move(v)) {} Result Next() { if (i_ == elements_.size()) { return NULLPTR; } return &elements_[i_++]; } private: std::vector elements_; size_t i_ = 0; }; template Iterator MakeVectorPointingIterator(std::vector v) { return Iterator(VectorPointingIterator(std::move(v))); } /// \brief MapIterator takes ownership of an iterator and a function to apply /// on every element. The mapped function is not allowed to fail. template class MapIterator { public: explicit MapIterator(Fn map, Iterator it) : map_(std::move(map)), it_(std::move(it)) {} Result Next() { ARROW_ASSIGN_OR_RAISE(I i, it_.Next()); if (i == IterationTraits::End()) { return IterationTraits::End(); } return map_(std::move(i)); } private: Fn map_; Iterator it_; }; /// \brief MapIterator takes ownership of an iterator and a function to apply /// on every element. The mapped function is not allowed to fail. template , typename To = internal::call_traits::return_type> Iterator MakeMapIterator(Fn map, Iterator it) { return Iterator(MapIterator(std::move(map), std::move(it))); } /// \brief Like MapIterator, but where the function can fail. template , typename To = typename internal::call_traits::return_type::ValueType> Iterator MakeMaybeMapIterator(Fn map, Iterator it) { return Iterator(MapIterator(std::move(map), std::move(it))); } struct FilterIterator { enum Action { ACCEPT, REJECT }; template static Result> Reject() { return std::make_pair(IterationTraits::End(), REJECT); } template static Result> Accept(To out) { return std::make_pair(std::move(out), ACCEPT); } template static Result> MaybeAccept(Result maybe_out) { return std::move(maybe_out).Map(Accept); } template static Result> Error(Status s) { return s; } template class Impl { public: explicit Impl(Fn filter, Iterator it) : filter_(filter), it_(std::move(it)) {} Result Next() { To out = IterationTraits::End(); Action action; for (;;) { ARROW_ASSIGN_OR_RAISE(From i, it_.Next()); if (i == IterationTraits::End()) { return IterationTraits::End(); } ARROW_ASSIGN_OR_RAISE(std::tie(out, action), filter_(std::move(i))); if (action == ACCEPT) return out; } } private: Fn filter_; Iterator it_; }; }; /// \brief Like MapIterator, but where the function can fail or reject elements. template < typename Fn, typename From = typename internal::call_traits::argument_type<0, Fn>, typename Ret = typename internal::call_traits::return_type::ValueType, typename To = typename std::tuple_element<0, Ret>::type, typename Enable = typename std::enable_if::type, FilterIterator::Action>::value>::type> Iterator MakeFilterIterator(Fn filter, Iterator it) { return Iterator( FilterIterator::Impl(std::move(filter), std::move(it))); } /// \brief FlattenIterator takes an iterator generating iterators and yields a /// unified iterator that flattens/concatenates in a single stream. template class FlattenIterator { public: explicit FlattenIterator(Iterator> it) : parent_(std::move(it)) {} Result Next() { if (child_ == IterationTraits>::End()) { // Pop from parent's iterator. ARROW_ASSIGN_OR_RAISE(child_, parent_.Next()); // Check if final iteration reached. if (child_ == IterationTraits>::End()) { return IterationTraits::End(); } return Next(); } // Pop from child_ and check for depletion. ARROW_ASSIGN_OR_RAISE(T out, child_.Next()); if (out == IterationTraits::End()) { // Reset state such that we pop from parent on the recursive call child_ = IterationTraits>::End(); return Next(); } return out; } private: Iterator> parent_; Iterator child_ = IterationTraits>::End(); }; template Iterator MakeFlattenIterator(Iterator> it) { return Iterator(FlattenIterator(std::move(it))); } namespace detail { // A type-erased promise object for ReadaheadQueue. struct ARROW_EXPORT ReadaheadPromise { virtual ~ReadaheadPromise(); virtual void Call() = 0; }; template struct ReadaheadIteratorPromise : ReadaheadPromise { ~ReadaheadIteratorPromise() override {} explicit ReadaheadIteratorPromise(Iterator* it) : it_(it) {} void Call() override { assert(!called_); out_ = it_->Next(); called_ = true; } Iterator* it_; Result out_ = IterationTraits::End(); bool called_ = false; }; class ARROW_EXPORT ReadaheadQueue { public: explicit ReadaheadQueue(int readahead_queue_size); ~ReadaheadQueue(); Status Append(std::unique_ptr); Status PopDone(std::unique_ptr*); Status Pump(std::function()> factory); Status Shutdown(); void EnsureShutdownOrDie(); protected: class Impl; std::shared_ptr impl_; }; } // namespace detail /// \brief Readahead iterator that iterates on the underlying iterator in a /// separate thread, getting up to N values in advance. template class ReadaheadIterator { using PromiseType = typename detail::ReadaheadIteratorPromise; public: // Public default constructor creates an empty iterator ReadaheadIterator() : done_(true) {} ~ReadaheadIterator() { if (queue_) { // Make sure the queue doesn't call any promises after this object // is destroyed. queue_->EnsureShutdownOrDie(); } } ARROW_DEFAULT_MOVE_AND_ASSIGN(ReadaheadIterator); ARROW_DISALLOW_COPY_AND_ASSIGN(ReadaheadIterator); Result Next() { if (done_) { return IterationTraits::End(); } std::unique_ptr promise; ARROW_RETURN_NOT_OK(queue_->PopDone(&promise)); auto it_promise = static_cast(promise.get()); ARROW_RETURN_NOT_OK(queue_->Append(MakePromise())); ARROW_ASSIGN_OR_RAISE(auto out, it_promise->out_); if (out == IterationTraits::End()) { done_ = true; } return out; } static Result> Make(Iterator it, int readahead_queue_size) { ReadaheadIterator rh(std::move(it), readahead_queue_size); ARROW_RETURN_NOT_OK(rh.Pump()); return Iterator(std::move(rh)); } private: explicit ReadaheadIterator(Iterator it, int readahead_queue_size) : it_(new Iterator(std::move(it))), queue_(new detail::ReadaheadQueue(readahead_queue_size)) {} Status Pump() { return queue_->Pump([this]() { return MakePromise(); }); } std::unique_ptr MakePromise() { return std::unique_ptr(new PromiseType{it_.get()}); } // The underlying iterator is referenced by pointer in ReadaheadPromise, // so make sure it doesn't move. std::unique_ptr> it_; std::unique_ptr queue_; bool done_ = false; }; template Result> MakeReadaheadIterator(Iterator it, int readahead_queue_size) { return ReadaheadIterator::Make(std::move(it), readahead_queue_size); } } // namespace arrow