// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #pragma once #include #include #include #include #include #include #include #include #include #include #include #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/discovery.h" #include "arrow/dataset/file_base.h" #include "arrow/filesystem/localfs.h" #include "arrow/filesystem/mockfs.h" #include "arrow/filesystem/path_util.h" #include "arrow/filesystem/test_util.h" #include "arrow/record_batch.h" #include "arrow/table.h" #include "arrow/testing/generator.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/io_util.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/make_unique.h" namespace arrow { namespace dataset { const std::shared_ptr kBoringSchema = schema({ field("i32", int32()), field("i32_req", int32(), /*nullable=*/false), field("i64", int64()), field("date64", date64()), field("f32", float32()), field("f32_req", float32(), /*nullable=*/false), field("f64", float64()), field("bool", boolean()), field("str", utf8()), field("dict_str", dictionary(int32(), utf8())), field("ts_ns", timestamp(TimeUnit::NANO)), }); using fs::internal::GetAbstractPathExtension; using internal::checked_cast; using internal::checked_pointer_cast; using internal::TemporaryDir; class FileSourceFixtureMixin : public ::testing::Test { public: std::unique_ptr GetSource(std::shared_ptr buffer) { return internal::make_unique(std::move(buffer)); } }; template class GeneratedRecordBatch : public RecordBatchReader { public: GeneratedRecordBatch(std::shared_ptr schema, Gen gen) : schema_(std::move(schema)), gen_(gen) {} std::shared_ptr schema() const override { return schema_; } Status ReadNext(std::shared_ptr* batch) override { return gen_(batch); } private: std::shared_ptr schema_; Gen gen_; }; template std::unique_ptr> MakeGeneratedRecordBatch( std::shared_ptr schema, Gen&& gen) { return internal::make_unique>(schema, std::forward(gen)); } std::unique_ptr MakeGeneratedRecordBatch( std::shared_ptr schema, int64_t batch_size, int64_t batch_repetitions) { auto batch = ConstantArrayGenerator::Zeroes(batch_size, schema); int64_t i = 0; return MakeGeneratedRecordBatch( schema, [batch, i, batch_repetitions](std::shared_ptr* out) mutable { *out = i++ < batch_repetitions ? batch : nullptr; return Status::OK(); }); } void EnsureRecordBatchReaderDrained(RecordBatchReader* reader) { ASSERT_OK_AND_ASSIGN(auto batch, reader->Next()); EXPECT_EQ(batch, nullptr); } class DatasetFixtureMixin : public ::testing::Test { public: /// \brief Ensure that record batches found in reader are equals to the /// record batches yielded by the data fragment. void AssertScanTaskEquals(RecordBatchReader* expected, ScanTask* task, bool ensure_drained = true) { ASSERT_OK_AND_ASSIGN(auto it, task->Execute()); ARROW_EXPECT_OK(it.Visit([expected](std::shared_ptr rhs) -> Status { std::shared_ptr lhs; RETURN_NOT_OK(expected->ReadNext(&lhs)); EXPECT_NE(lhs, nullptr); AssertBatchesEqual(*lhs, *rhs); return Status::OK(); })); if (ensure_drained) { EnsureRecordBatchReaderDrained(expected); } } /// \brief Ensure that record batches found in reader are equals to the /// record batches yielded by the data fragment. void AssertFragmentEquals(RecordBatchReader* expected, Fragment* fragment, bool ensure_drained = true) { ASSERT_OK_AND_ASSIGN(auto it, fragment->Scan(options_, ctx_)); ARROW_EXPECT_OK(it.Visit([&](std::shared_ptr task) -> Status { AssertScanTaskEquals(expected, task.get(), false); return Status::OK(); })); if (ensure_drained) { EnsureRecordBatchReaderDrained(expected); } } /// \brief Ensure that record batches found in reader are equals to the /// record batches yielded by the data fragments of a dataset. void AssertDatasetFragmentsEqual(RecordBatchReader* expected, Dataset* dataset, bool ensure_drained = true) { ASSERT_OK_AND_ASSIGN(auto predicate, options_->filter.Bind(*dataset->schema())); ASSERT_OK_AND_ASSIGN(auto it, dataset->GetFragments(predicate)); ARROW_EXPECT_OK(it.Visit([&](std::shared_ptr fragment) -> Status { AssertFragmentEquals(expected, fragment.get(), false); return Status::OK(); })); if (ensure_drained) { EnsureRecordBatchReaderDrained(expected); } } /// \brief Ensure that record batches found in reader are equals to the /// record batches yielded by a scanner. void AssertScannerEquals(RecordBatchReader* expected, Scanner* scanner, bool ensure_drained = true) { ASSERT_OK_AND_ASSIGN(auto it, scanner->Scan()); ARROW_EXPECT_OK(it.Visit([&](std::shared_ptr task) -> Status { AssertScanTaskEquals(expected, task.get(), false); return Status::OK(); })); if (ensure_drained) { EnsureRecordBatchReaderDrained(expected); } } /// \brief Ensure that record batches found in reader are equals to the /// record batches yielded by a dataset. void AssertDatasetEquals(RecordBatchReader* expected, Dataset* dataset, bool ensure_drained = true) { ASSERT_OK_AND_ASSIGN(auto builder, dataset->NewScan()); ASSERT_OK_AND_ASSIGN(auto scanner, builder->Finish()); AssertScannerEquals(expected, scanner.get()); if (ensure_drained) { EnsureRecordBatchReaderDrained(expected); } } protected: void SetSchema(std::vector> fields) { schema_ = schema(std::move(fields)); options_ = ScanOptions::Make(schema_); SetFilter(literal(true)); } void SetFilter(Expression filter) { ASSERT_OK_AND_ASSIGN(options_->filter, filter.Bind(*schema_)); } std::shared_ptr schema_; std::shared_ptr options_; std::shared_ptr ctx_ = std::make_shared(); }; /// \brief A dummy FileFormat implementation class DummyFileFormat : public FileFormat { public: explicit DummyFileFormat(std::shared_ptr schema = NULLPTR) : schema_(std::move(schema)) {} std::string type_name() const override { return "dummy"; } bool Equals(const FileFormat& other) const override { return type_name() == other.type_name() && schema_->Equals(checked_cast(other).schema_); } Result IsSupported(const FileSource& source) const override { return true; } Result> Inspect(const FileSource& source) const override { return schema_; } /// \brief Open a file for scanning (always returns an empty iterator) Result ScanFile(std::shared_ptr options, std::shared_ptr context, FileFragment* fragment) const override { return MakeEmptyIterator>(); } Result> MakeWriter( std::shared_ptr destination, std::shared_ptr schema, std::shared_ptr options) const override { return Status::NotImplemented("writing fragment of DummyFileFormat"); } std::shared_ptr DefaultWriteOptions() override { return nullptr; } protected: std::shared_ptr schema_; }; class JSONRecordBatchFileFormat : public FileFormat { public: using SchemaResolver = std::function(const FileSource&)>; explicit JSONRecordBatchFileFormat(std::shared_ptr schema) : resolver_([schema](const FileSource&) { return schema; }) {} explicit JSONRecordBatchFileFormat(SchemaResolver resolver) : resolver_(std::move(resolver)) {} bool Equals(const FileFormat& other) const override { return this == &other; } std::string type_name() const override { return "json_record_batch"; } /// \brief Return true if the given file extension Result IsSupported(const FileSource& source) const override { return true; } Result> Inspect(const FileSource& source) const override { return resolver_(source); } /// \brief Open a file for scanning Result ScanFile(std::shared_ptr options, std::shared_ptr context, FileFragment* fragment) const override { ARROW_ASSIGN_OR_RAISE(auto file, fragment->source().Open()); ARROW_ASSIGN_OR_RAISE(int64_t size, file->GetSize()); ARROW_ASSIGN_OR_RAISE(auto buffer, file->Read(size)); util::string_view view{*buffer}; ARROW_ASSIGN_OR_RAISE(auto schema, Inspect(fragment->source())); std::shared_ptr batch = RecordBatchFromJSON(schema, view); return ScanTaskIteratorFromRecordBatch({batch}, std::move(options), std::move(context)); } Result> MakeWriter( std::shared_ptr destination, std::shared_ptr schema, std::shared_ptr options) const override { return Status::NotImplemented("writing fragment of JSONRecordBatchFileFormat"); } std::shared_ptr DefaultWriteOptions() override { return nullptr; } protected: SchemaResolver resolver_; }; struct MakeFileSystemDatasetMixin { std::vector ParsePathList(const std::string& pathlist) { std::vector infos; std::stringstream ss(pathlist); std::string line; while (std::getline(ss, line)) { auto start = line.find_first_not_of(" \n\r\t"); if (start == std::string::npos) { continue; } line.erase(0, start); if (line.front() == '#') { continue; } if (line.back() == '/') { infos.push_back(fs::Dir(line)); continue; } infos.push_back(fs::File(line)); } return infos; } void MakeFileSystem(const std::vector& infos) { ASSERT_OK_AND_ASSIGN(fs_, fs::internal::MockFileSystem::Make(fs::kNoTime, infos)); } void MakeFileSystem(const std::vector& paths) { std::vector infos{paths.size()}; std::transform(paths.cbegin(), paths.cend(), infos.begin(), [](const std::string& p) { return fs::File(p); }); ASSERT_OK_AND_ASSIGN(fs_, fs::internal::MockFileSystem::Make(fs::kNoTime, infos)); } void MakeDataset(const std::vector& infos, Expression root_partition = literal(true), std::vector partitions = {}, std::shared_ptr s = kBoringSchema) { auto n_fragments = infos.size(); if (partitions.empty()) { partitions.resize(n_fragments, literal(true)); } MakeFileSystem(infos); auto format = std::make_shared(s); std::vector> fragments; for (size_t i = 0; i < n_fragments; i++) { const auto& info = infos[i]; if (!info.IsFile()) { continue; } ASSERT_OK_AND_ASSIGN(partitions[i], partitions[i].Bind(*s)); ASSERT_OK_AND_ASSIGN(auto fragment, format->MakeFragment({info, fs_}, partitions[i])); fragments.push_back(std::move(fragment)); } ASSERT_OK_AND_ASSIGN(root_partition, root_partition.Bind(*s)); ASSERT_OK_AND_ASSIGN(dataset_, FileSystemDataset::Make(s, root_partition, format, fs_, std::move(fragments))); } std::shared_ptr fs_; std::shared_ptr dataset_; std::shared_ptr options_ = ScanOptions::Make(schema({})); }; static const std::string& PathOf(const std::shared_ptr& fragment) { EXPECT_NE(fragment, nullptr); EXPECT_THAT(fragment->type_name(), "dummy"); return internal::checked_cast(*fragment).source().path(); } class TestFileSystemDataset : public ::testing::Test, public MakeFileSystemDatasetMixin {}; static std::vector PathsOf(const FragmentVector& fragments) { std::vector paths(fragments.size()); std::transform(fragments.begin(), fragments.end(), paths.begin(), PathOf); return paths; } void AssertFilesAre(const std::shared_ptr& dataset, std::vector expected) { auto fs_dataset = internal::checked_cast(dataset.get()); EXPECT_THAT(fs_dataset->files(), testing::UnorderedElementsAreArray(expected)); } void AssertFragmentsAreFromPath(FragmentIterator it, std::vector expected) { // Ordering is not guaranteed. EXPECT_THAT(PathsOf(IteratorToVector(std::move(it))), testing::UnorderedElementsAreArray(expected)); } static std::vector PartitionExpressionsOf(const FragmentVector& fragments) { std::vector partition_expressions; std::transform(fragments.begin(), fragments.end(), std::back_inserter(partition_expressions), [](const std::shared_ptr& fragment) { return fragment->partition_expression(); }); return partition_expressions; } void AssertFragmentsHavePartitionExpressions(std::shared_ptr dataset, std::vector expected) { ASSERT_OK_AND_ASSIGN(auto fragment_it, dataset->GetFragments()); for (auto& expr : expected) { ASSERT_OK_AND_ASSIGN(expr, expr.Bind(*dataset->schema())); } // Ordering is not guaranteed. EXPECT_THAT(PartitionExpressionsOf(IteratorToVector(std::move(fragment_it))), testing::UnorderedElementsAreArray(expected)); } struct ArithmeticDatasetFixture { static std::shared_ptr schema() { return ::arrow::schema({ field("i64", int64()), // ARROW-1644: Parquet can't write complex level // field("struct", struct_({ // // ARROW-2587: Parquet can't write struct with more // // than one field. // // field("i32", int32()), // field("str", utf8()), // })), field("u8", uint8()), field("list", list(int32())), field("bool", boolean()), }); } /// \brief Creates a single JSON record templated with n as follow. /// /// {"i64": n, "struct": {"i32": n, "str": "n"}, "u8": n "list": [n,n], "bool": n % /// 2}, static std::string JSONRecordFor(int64_t n) { std::stringstream ss; auto n_i32 = static_cast(n); ss << "{"; ss << "\"i64\": " << n << ", "; // ss << "\"struct\": {"; // { // // ss << "\"i32\": " << n_i32 << ", "; // ss << "\"str\": \"" << std::to_string(n) << "\""; // } // ss << "}, "; ss << "\"u8\": " << static_cast(n) << ", "; ss << "\"list\": [" << n_i32 << ", " << n_i32 << "], "; ss << "\"bool\": " << (static_cast(n % 2) ? "true" : "false"); ss << "}"; return ss.str(); } /// \brief Creates a JSON RecordBatch static std::string JSONRecordBatch(int64_t n) { DCHECK_GT(n, 0); auto record = JSONRecordFor(n); std::stringstream ss; ss << "[\n"; for (int64_t i = 1; i <= n; i++) { if (i != 1) { ss << "\n,"; } ss << record; } ss << "]\n"; return ss.str(); } static std::shared_ptr GetRecordBatch(int64_t n) { return RecordBatchFromJSON(ArithmeticDatasetFixture::schema(), JSONRecordBatch(n)); } static std::unique_ptr GetRecordBatchReader(int64_t n) { DCHECK_GT(n, 0); // Functor which generates `n` RecordBatch struct { Status operator()(std::shared_ptr* out) { *out = i++ < count ? GetRecordBatch(i) : nullptr; return Status::OK(); } int64_t i; int64_t count; } generator{0, n}; return MakeGeneratedRecordBatch(schema(), std::move(generator)); } }; class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { public: using PathAndContent = std::unordered_map; void MakeSourceDataset() { PathAndContent source_files; source_files["/dataset/year=2018/month=01/dat0.json"] = R"([ {"region": "NY", "model": "3", "sales": 742.0, "country": "US"}, {"region": "NY", "model": "S", "sales": 304.125, "country": "US"}, {"region": "NY", "model": "Y", "sales": 27.5, "country": "US"} ])"; source_files["/dataset/year=2018/month=01/dat1.json"] = R"([ {"region": "QC", "model": "3", "sales": 512, "country": "CA"}, {"region": "QC", "model": "S", "sales": 978, "country": "CA"}, {"region": "NY", "model": "X", "sales": 136.25, "country": "US"}, {"region": "QC", "model": "X", "sales": 1.0, "country": "CA"}, {"region": "QC", "model": "Y", "sales": 69, "country": "CA"} ])"; source_files["/dataset/year=2019/month=01/dat0.json"] = R"([ {"region": "CA", "model": "3", "sales": 273.5, "country": "US"}, {"region": "CA", "model": "S", "sales": 13, "country": "US"}, {"region": "CA", "model": "X", "sales": 54, "country": "US"}, {"region": "QC", "model": "S", "sales": 10, "country": "CA"}, {"region": "CA", "model": "Y", "sales": 21, "country": "US"} ])"; source_files["/dataset/year=2019/month=01/dat1.json"] = R"([ {"region": "QC", "model": "3", "sales": 152.25, "country": "CA"}, {"region": "QC", "model": "X", "sales": 42, "country": "CA"}, {"region": "QC", "model": "Y", "sales": 37, "country": "CA"} ])"; source_files["/dataset/.pesky"] = "garbage content"; auto mock_fs = std::make_shared(fs::kNoTime); for (const auto& f : source_files) { ARROW_EXPECT_OK(mock_fs->CreateFile(f.first, f.second, /* recursive */ true)); } fs_ = mock_fs; /// schema for the whole dataset (both source and destination) source_schema_ = schema({ field("region", utf8()), field("model", utf8()), field("sales", float64()), field("year", int32()), field("month", int32()), field("country", utf8()), }); /// Dummy file format for source dataset. Note that it isn't partitioned on country auto source_format = std::make_shared( SchemaFromColumnNames(source_schema_, {"region", "model", "sales", "country"})); fs::FileSelector s; s.base_dir = "/dataset"; s.recursive = true; FileSystemFactoryOptions options; options.selector_ignore_prefixes = {"."}; options.partitioning = std::make_shared( SchemaFromColumnNames(source_schema_, {"year", "month"})); ASSERT_OK_AND_ASSIGN(auto factory, FileSystemDatasetFactory::Make(fs_, s, source_format, options)); ASSERT_OK_AND_ASSIGN(dataset_, factory->Finish()); scan_options_ = ScanOptions::Make(source_schema_); } void SetWriteOptions(std::shared_ptr file_write_options) { write_options_.file_write_options = file_write_options; write_options_.filesystem = fs_; write_options_.base_dir = "new_root/"; write_options_.basename_template = "dat_{i}"; } void DoWrite(std::shared_ptr desired_partitioning) { write_options_.partitioning = desired_partitioning; auto scanner = std::make_shared(dataset_, scan_options_, scan_context_); ASSERT_OK(FileSystemDataset::Write(write_options_, scanner)); // re-discover the written dataset fs::FileSelector s; s.recursive = true; s.base_dir = "/new_root"; FileSystemFactoryOptions factory_options; factory_options.partitioning = desired_partitioning; ASSERT_OK_AND_ASSIGN( auto factory, FileSystemDatasetFactory::Make(fs_, s, format_, factory_options)); ASSERT_OK_AND_ASSIGN(written_, factory->Finish()); } void TestWriteWithIdenticalPartitioningSchema() { DoWrite(std::make_shared( SchemaFromColumnNames(source_schema_, {"year", "month"}))); expected_files_["/new_root/2018/1/dat_0"] = R"([ {"region": "NY", "model": "3", "sales": 742.0, "country": "US"}, {"region": "NY", "model": "S", "sales": 304.125, "country": "US"}, {"region": "NY", "model": "Y", "sales": 27.5, "country": "US"}, {"region": "QC", "model": "3", "sales": 512, "country": "CA"}, {"region": "QC", "model": "S", "sales": 978, "country": "CA"}, {"region": "NY", "model": "X", "sales": 136.25, "country": "US"}, {"region": "QC", "model": "X", "sales": 1.0, "country": "CA"}, {"region": "QC", "model": "Y", "sales": 69, "country": "CA"} ])"; expected_files_["/new_root/2019/1/dat_1"] = R"([ {"region": "CA", "model": "3", "sales": 273.5, "country": "US"}, {"region": "CA", "model": "S", "sales": 13, "country": "US"}, {"region": "CA", "model": "X", "sales": 54, "country": "US"}, {"region": "QC", "model": "S", "sales": 10, "country": "CA"}, {"region": "CA", "model": "Y", "sales": 21, "country": "US"}, {"region": "QC", "model": "3", "sales": 152.25, "country": "CA"}, {"region": "QC", "model": "X", "sales": 42, "country": "CA"}, {"region": "QC", "model": "Y", "sales": 37, "country": "CA"} ])"; expected_physical_schema_ = SchemaFromColumnNames(source_schema_, {"region", "model", "sales", "country"}); AssertWrittenAsExpected(); } void TestWriteWithUnrelatedPartitioningSchema() { DoWrite(std::make_shared( SchemaFromColumnNames(source_schema_, {"country", "region"}))); // XXX first thing a user will be annoyed by: we don't support left // padding the month field with 0. expected_files_["/new_root/US/NY/dat_0"] = R"([ {"year": 2018, "month": 1, "model": "3", "sales": 742.0}, {"year": 2018, "month": 1, "model": "S", "sales": 304.125}, {"year": 2018, "month": 1, "model": "Y", "sales": 27.5}, {"year": 2018, "month": 1, "model": "X", "sales": 136.25} ])"; expected_files_["/new_root/CA/QC/dat_1"] = R"([ {"year": 2018, "month": 1, "model": "3", "sales": 512}, {"year": 2018, "month": 1, "model": "S", "sales": 978}, {"year": 2018, "month": 1, "model": "X", "sales": 1.0}, {"year": 2018, "month": 1, "model": "Y", "sales": 69}, {"year": 2019, "month": 1, "model": "S", "sales": 10}, {"year": 2019, "month": 1, "model": "3", "sales": 152.25}, {"year": 2019, "month": 1, "model": "X", "sales": 42}, {"year": 2019, "month": 1, "model": "Y", "sales": 37} ])"; expected_files_["/new_root/US/CA/dat_2"] = R"([ {"year": 2019, "month": 1, "model": "3", "sales": 273.5}, {"year": 2019, "month": 1, "model": "S", "sales": 13}, {"year": 2019, "month": 1, "model": "X", "sales": 54}, {"year": 2019, "month": 1, "model": "Y", "sales": 21} ])"; expected_physical_schema_ = SchemaFromColumnNames(source_schema_, {"model", "sales", "year", "month"}); AssertWrittenAsExpected(); } void TestWriteWithSupersetPartitioningSchema() { DoWrite(std::make_shared( SchemaFromColumnNames(source_schema_, {"year", "month", "country", "region"}))); // XXX first thing a user will be annoyed by: we don't support left // padding the month field with 0. expected_files_["/new_root/2018/1/US/NY/dat_0"] = R"([ {"model": "3", "sales": 742.0}, {"model": "S", "sales": 304.125}, {"model": "Y", "sales": 27.5}, {"model": "X", "sales": 136.25} ])"; expected_files_["/new_root/2018/1/CA/QC/dat_1"] = R"([ {"model": "3", "sales": 512}, {"model": "S", "sales": 978}, {"model": "X", "sales": 1.0}, {"model": "Y", "sales": 69} ])"; expected_files_["/new_root/2019/1/US/CA/dat_2"] = R"([ {"model": "3", "sales": 273.5}, {"model": "S", "sales": 13}, {"model": "X", "sales": 54}, {"model": "Y", "sales": 21} ])"; expected_files_["/new_root/2019/1/CA/QC/dat_3"] = R"([ {"model": "S", "sales": 10}, {"model": "3", "sales": 152.25}, {"model": "X", "sales": 42}, {"model": "Y", "sales": 37} ])"; expected_physical_schema_ = SchemaFromColumnNames(source_schema_, {"model", "sales"}); AssertWrittenAsExpected(); } void TestWriteWithEmptyPartitioningSchema() { DoWrite(std::make_shared( SchemaFromColumnNames(source_schema_, {}))); expected_files_["/new_root/dat_0"] = R"([ {"country": "US", "region": "NY", "year": 2018, "month": 1, "model": "3", "sales": 742.0}, {"country": "US", "region": "NY", "year": 2018, "month": 1, "model": "S", "sales": 304.125}, {"country": "US", "region": "NY", "year": 2018, "month": 1, "model": "Y", "sales": 27.5}, {"country": "CA", "region": "QC", "year": 2018, "month": 1, "model": "3", "sales": 512}, {"country": "CA", "region": "QC", "year": 2018, "month": 1, "model": "S", "sales": 978}, {"country": "US", "region": "NY", "year": 2018, "month": 1, "model": "X", "sales": 136.25}, {"country": "CA", "region": "QC", "year": 2018, "month": 1, "model": "X", "sales": 1.0}, {"country": "CA", "region": "QC", "year": 2018, "month": 1, "model": "Y", "sales": 69}, {"country": "US", "region": "CA", "year": 2019, "month": 1, "model": "3", "sales": 273.5}, {"country": "US", "region": "CA", "year": 2019, "month": 1, "model": "S", "sales": 13}, {"country": "US", "region": "CA", "year": 2019, "month": 1, "model": "X", "sales": 54}, {"country": "CA", "region": "QC", "year": 2019, "month": 1, "model": "S", "sales": 10}, {"country": "US", "region": "CA", "year": 2019, "month": 1, "model": "Y", "sales": 21}, {"country": "CA", "region": "QC", "year": 2019, "month": 1, "model": "3", "sales": 152.25}, {"country": "CA", "region": "QC", "year": 2019, "month": 1, "model": "X", "sales": 42}, {"country": "CA", "region": "QC", "year": 2019, "month": 1, "model": "Y", "sales": 37} ])"; expected_physical_schema_ = source_schema_; AssertWrittenAsExpected(); } void AssertWrittenAsExpected() { std::unordered_set expected_paths, actual_paths; for (const auto& file_contents : expected_files_) { expected_paths.insert(file_contents.first); } for (auto path : checked_pointer_cast(written_)->files()) { actual_paths.insert(std::move(path)); } EXPECT_THAT(actual_paths, testing::UnorderedElementsAreArray(expected_paths)); ASSERT_OK_AND_ASSIGN(auto written_fragments_it, written_->GetFragments()); for (auto maybe_fragment : written_fragments_it) { ASSERT_OK_AND_ASSIGN(auto fragment, maybe_fragment); ASSERT_OK_AND_ASSIGN(auto actual_physical_schema, fragment->ReadPhysicalSchema()); AssertSchemaEqual(*expected_physical_schema_, *actual_physical_schema, check_metadata_); const auto& path = checked_pointer_cast(fragment)->source().path(); auto file_contents = expected_files_.find(path); if (file_contents == expected_files_.end()) { // file wasn't expected to be written at all; nothing to compare with continue; } ASSERT_OK_AND_ASSIGN(auto scanner, ScannerBuilder(actual_physical_schema, fragment, std::make_shared()) .Finish()); ASSERT_OK_AND_ASSIGN(auto actual_table, scanner->ToTable()); ASSERT_OK_AND_ASSIGN(actual_table, actual_table->CombineChunks()); std::shared_ptr actual_struct; for (auto maybe_batch : IteratorFromReader(std::make_shared(*actual_table))) { ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); ASSERT_OK_AND_ASSIGN(actual_struct, batch->ToStructArray()); } auto expected_struct = ArrayFromJSON(struct_(expected_physical_schema_->fields()), {file_contents->second}); AssertArraysEqual(*expected_struct, *actual_struct, /*verbose=*/true); } } bool check_metadata_ = true; std::shared_ptr source_schema_; std::shared_ptr format_; PathAndContent expected_files_; std::shared_ptr expected_physical_schema_; std::shared_ptr written_; FileSystemDatasetWriteOptions write_options_; std::shared_ptr scan_options_; std::shared_ptr scan_context_ = std::make_shared(); }; } // namespace dataset } // namespace arrow