bkietz commented on a change in pull request #7073: URL: https://github.com/apache/arrow/pull/7073#discussion_r418193373
########## File path: cpp/src/arrow/dataset/file_base.cc ########## @@ -83,131 +83,67 @@ Result<ScanTaskIterator> FileFragment::Scan(std::shared_ptr<ScanOptions> options FileSystemDataset::FileSystemDataset(std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition, - std::shared_ptr<FileFormat> format, - std::shared_ptr<fs::FileSystem> filesystem, - fs::PathForest forest, - ExpressionVector file_partitions) + std::vector<std::shared_ptr<FileFragment>> fragments) : Dataset(std::move(schema), std::move(root_partition)), - format_(std::move(format)), - filesystem_(std::move(filesystem)), - forest_(std::move(forest)), - partitions_(std::move(file_partitions)) { - DCHECK_EQ(static_cast<size_t>(forest_.size()), partitions_.size()); -} - -Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Make( - std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition, - std::shared_ptr<FileFormat> format, std::shared_ptr<fs::FileSystem> filesystem, - std::vector<fs::FileInfo> infos) { - ExpressionVector partitions(infos.size(), scalar(true)); - return Make(std::move(schema), std::move(root_partition), std::move(format), - std::move(filesystem), std::move(infos), std::move(partitions)); -} + fragments_(std::move(fragments)) {} Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Make( std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition, - std::shared_ptr<FileFormat> format, std::shared_ptr<fs::FileSystem> filesystem, - std::vector<fs::FileInfo> infos, ExpressionVector partitions) { - ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::Make(std::move(infos), &partitions)); - return Make(std::move(schema), std::move(root_partition), std::move(format), - std::move(filesystem), std::move(forest), std::move(partitions)); -} + FragmentVector fragments) { + std::vector<std::shared_ptr<FileFragment>> file_fragments; + for (const auto& fragment : fragments) { + auto file_fragment = internal::checked_pointer_cast<FileFragment>(fragment); Review comment: This will crash/UB if any is not a FileFragment; we should check fragment->type_name() ########## File path: cpp/src/arrow/dataset/discovery.h ########## @@ -220,16 +220,20 @@ class ARROW_DS_EXPORT FileSystemDatasetFactory : public DatasetFactory { Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) override; protected: - FileSystemDatasetFactory(std::shared_ptr<fs::FileSystem> filesystem, - fs::PathForest forest, std::shared_ptr<FileFormat> format, + FileSystemDatasetFactory(std::vector<std::string> paths, + std::shared_ptr<fs::FileSystem> filesystem, + std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options); Result<std::shared_ptr<Schema>> PartitionSchema(); + std::vector<std::string> paths_; std::shared_ptr<fs::FileSystem> fs_; - fs::PathForest forest_; std::shared_ptr<FileFormat> format_; FileSystemFactoryOptions options_; + + private: + util::optional<util::string_view> BaselessPath(util::string_view path); Review comment: ```suggestion util::optional<util::string_view> RemovePartitionBaseDir(util::string_view path); ``` ########## File path: cpp/src/arrow/dataset/discovery.cc ########## @@ -246,32 +235,25 @@ Result<std::shared_ptr<Dataset>> FileSystemDatasetFactory::Finish(FinishOptions } } - ExpressionVector partitions(forest_.size(), scalar(true)); std::shared_ptr<Partitioning> partitioning = options_.partitioning.partitioning(); if (partitioning == nullptr) { auto factory = options_.partitioning.factory(); ARROW_ASSIGN_OR_RAISE(partitioning, factory->Finish(schema)); } - // apply partitioning to forest to derive partitions - auto apply_partitioning = [&](fs::PathForest::Ref ref) { - if (auto relative = fs::internal::RemoveAncestor(options_.partition_base_dir, - ref.info().path())) { - auto segments = fs::internal::SplitAbstractPath(relative->to_string()); - - if (segments.size() > 0) { - auto segment_index = static_cast<int>(segments.size()) - 1; - auto maybe_partition = partitioning->Parse(segments.back(), segment_index); - - partitions[ref.i] = std::move(maybe_partition).ValueOr(scalar(true)); - } + FragmentVector fragments; + for (const auto& path : paths_) { + std::shared_ptr<Expression> partition = scalar(true); + if (auto relative = BaselessPath(path)) { + std::string path_string{*relative}; + partition = partitioning->Parse(path_string).ValueOr(scalar(true)); Review comment: ```suggestion partition = partitioning->Parse(relative->to_string()).ValueOr(scalar(true)); ``` ########## File path: cpp/src/arrow/dataset/file_base.cc ########## @@ -83,131 +83,67 @@ Result<ScanTaskIterator> FileFragment::Scan(std::shared_ptr<ScanOptions> options FileSystemDataset::FileSystemDataset(std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition, - std::shared_ptr<FileFormat> format, - std::shared_ptr<fs::FileSystem> filesystem, - fs::PathForest forest, - ExpressionVector file_partitions) + std::vector<std::shared_ptr<FileFragment>> fragments) : Dataset(std::move(schema), std::move(root_partition)), - format_(std::move(format)), - filesystem_(std::move(filesystem)), - forest_(std::move(forest)), - partitions_(std::move(file_partitions)) { - DCHECK_EQ(static_cast<size_t>(forest_.size()), partitions_.size()); -} - -Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Make( - std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition, - std::shared_ptr<FileFormat> format, std::shared_ptr<fs::FileSystem> filesystem, - std::vector<fs::FileInfo> infos) { - ExpressionVector partitions(infos.size(), scalar(true)); - return Make(std::move(schema), std::move(root_partition), std::move(format), - std::move(filesystem), std::move(infos), std::move(partitions)); -} + fragments_(std::move(fragments)) {} Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Make( std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition, - std::shared_ptr<FileFormat> format, std::shared_ptr<fs::FileSystem> filesystem, - std::vector<fs::FileInfo> infos, ExpressionVector partitions) { - ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::Make(std::move(infos), &partitions)); - return Make(std::move(schema), std::move(root_partition), std::move(format), - std::move(filesystem), std::move(forest), std::move(partitions)); -} + FragmentVector fragments) { + std::vector<std::shared_ptr<FileFragment>> file_fragments; + for (const auto& fragment : fragments) { + auto file_fragment = internal::checked_pointer_cast<FileFragment>(fragment); Review comment: I'd object more to the fragile type erasure of the `fragments` parameter but we'll be following up on this for https://issues.apache.org/jira/browse/ARROW-8163 ########## File path: cpp/src/arrow/dataset/filter.h ########## @@ -188,6 +188,10 @@ class ARROW_DS_EXPORT Expression { /// This is a shortcut to check if the expression is neither null nor false. bool IsSatisfiable() const { return !IsNull() && !Equals(false); } + bool IsSatisfiableWith(const std::shared_ptr<Expression> other) const { Review comment: ```suggestion bool IsSatisfiableWith(const std::shared_ptr<Expression>& other) const { ``` ########## File path: cpp/src/arrow/dataset/file_base.cc ########## @@ -83,131 +83,67 @@ Result<ScanTaskIterator> FileFragment::Scan(std::shared_ptr<ScanOptions> options FileSystemDataset::FileSystemDataset(std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition, - std::shared_ptr<FileFormat> format, - std::shared_ptr<fs::FileSystem> filesystem, - fs::PathForest forest, - ExpressionVector file_partitions) + std::vector<std::shared_ptr<FileFragment>> fragments) : Dataset(std::move(schema), std::move(root_partition)), - format_(std::move(format)), - filesystem_(std::move(filesystem)), - forest_(std::move(forest)), - partitions_(std::move(file_partitions)) { - DCHECK_EQ(static_cast<size_t>(forest_.size()), partitions_.size()); -} - -Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Make( - std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition, - std::shared_ptr<FileFormat> format, std::shared_ptr<fs::FileSystem> filesystem, - std::vector<fs::FileInfo> infos) { - ExpressionVector partitions(infos.size(), scalar(true)); - return Make(std::move(schema), std::move(root_partition), std::move(format), - std::move(filesystem), std::move(infos), std::move(partitions)); -} + fragments_(std::move(fragments)) {} Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Make( std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition, - std::shared_ptr<FileFormat> format, std::shared_ptr<fs::FileSystem> filesystem, - std::vector<fs::FileInfo> infos, ExpressionVector partitions) { - ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::Make(std::move(infos), &partitions)); - return Make(std::move(schema), std::move(root_partition), std::move(format), - std::move(filesystem), std::move(forest), std::move(partitions)); -} + FragmentVector fragments) { + std::vector<std::shared_ptr<FileFragment>> file_fragments; + for (const auto& fragment : fragments) { + auto file_fragment = internal::checked_pointer_cast<FileFragment>(fragment); + file_fragments.push_back(std::move(file_fragment)); + } -Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Make( - std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition, - std::shared_ptr<FileFormat> format, std::shared_ptr<fs::FileSystem> filesystem, - fs::PathForest forest, ExpressionVector partitions) { return std::shared_ptr<FileSystemDataset>(new FileSystemDataset( - std::move(schema), std::move(root_partition), std::move(format), - std::move(filesystem), std::move(forest), std::move(partitions))); + std::move(schema), std::move(root_partition), std::move(file_fragments))); } Result<std::shared_ptr<Dataset>> FileSystemDataset::ReplaceSchema( std::shared_ptr<Schema> schema) const { RETURN_NOT_OK(CheckProjectable(*schema_, *schema)); return std::shared_ptr<Dataset>( - new FileSystemDataset(std::move(schema), partition_expression_, format_, - filesystem_, forest_, partitions_)); + new FileSystemDataset(std::move(schema), partition_expression_, fragments_)); } std::vector<std::string> FileSystemDataset::files() const { std::vector<std::string> files; - DCHECK_OK(forest_.Visit([&](fs::PathForest::Ref ref) { - if (ref.info().IsFile()) { - files.push_back(ref.info().path()); - } - return Status::OK(); - })); + for (const auto& fragment : fragments_) { + files.push_back(fragment->source().path()); + } return files; } std::string FileSystemDataset::ToString() const { std::string repr = "FileSystemDataset:"; - if (forest_.size() == 0) { + if (fragments_.empty()) { return repr + " []"; } - DCHECK_OK(forest_.Visit([&](fs::PathForest::Ref ref) { - repr += "\n" + ref.info().path(); + for (const auto& fragment : fragments_) { + repr += "\n" + fragment->source().path(); - if (!partitions_[ref.i]->Equals(true)) { - repr += ": " + partitions_[ref.i]->ToString(); + const auto& partition = fragment->partition_expression(); + if (!partition->Equals(true)) { + repr += ": " + partition->ToString(); } - - return Status::OK(); - })); + } return repr; } -std::shared_ptr<Expression> FoldingAnd(const std::shared_ptr<Expression>& l, - const std::shared_ptr<Expression>& r) { - if (l->Equals(true)) return r; - if (r->Equals(true)) return l; - return and_(l, r); -} - FragmentIterator FileSystemDataset::GetFragmentsImpl( std::shared_ptr<Expression> predicate) { FragmentVector fragments; - ExpressionVector fragment_partitions(forest_.size()); - - auto collect_fragments = [&](fs::PathForest::Ref ref) -> fs::PathForest::MaybePrune { - auto partition = partitions_[ref.i]; - - // if available, copy parent's filter and projector - // (which are appropriately simplified and loaded with default values) - if (auto parent = ref.parent()) { - fragment_partitions[ref.i] = FoldingAnd(fragment_partitions[parent.i], partition); - } else { - fragment_partitions[ref.i] = FoldingAnd(partition_expression_, partition); + for (const auto& fragment : fragments_) { Review comment: Follow up for sub tree pruning https://issues.apache.org/jira/browse/ARROW-8658 ########## File path: python/pyarrow/_dataset.pyx ########## @@ -290,21 +292,23 @@ cdef class FileSystemDataset(Dataset): "got {2})".format(name, class_.__name__, type(arg)) ) - for info in filesystem.get_file_info(paths_or_selector): - c_file_infos.push_back(info.unwrap()) + infos = filesystem.get_file_info(paths_or_selector) if partitions is None: - partitions = [ - ScalarExpression(True) for _ in range(c_file_infos.size())] - for expr in partitions: - c_partitions.push_back(expr.unwrap()) + partitions = [ScalarExpression(True) for _ in range(len(infos))] Review comment: ```suggestion partitions = [ScalarExpression(True)] * len(infos) ``` ########## File path: cpp/src/arrow/dataset/file_base.cc ########## @@ -221,42 +157,34 @@ Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Write( filesystem = std::make_shared<fs::LocalFileSystem>(); } - std::vector<fs::FileInfo> files(plan.paths.size()); - ExpressionVector partition_expressions(plan.paths.size(), scalar(true)); auto task_group = scan_context->TaskGroup(); - auto partition_base_dir = fs::internal::EnsureTrailingSlash(plan.partition_base_dir); auto extension = "." + plan.format->type_name(); + FragmentVector fragments; for (size_t i = 0; i < plan.paths.size(); ++i) { const auto& op = plan.fragment_or_partition_expressions[i]; - if (util::holds_alternative<std::shared_ptr<Expression>>(op)) { - files[i].set_type(fs::FileType::Directory); - files[i].set_path(partition_base_dir + plan.paths[i]); + if (util::holds_alternative<std::shared_ptr<Fragment>>(op)) { Review comment: since we're no longer doing anything with directories, these should be removed from `WritePlan::fragment_or_partition_expression` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org