bkietz commented on a change in pull request #7073:
URL: https://github.com/apache/arrow/pull/7073#discussion_r418193373



##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -83,131 +83,67 @@ Result<ScanTaskIterator> 
FileFragment::Scan(std::shared_ptr<ScanOptions> options
 
 FileSystemDataset::FileSystemDataset(std::shared_ptr<Schema> schema,
                                      std::shared_ptr<Expression> 
root_partition,
-                                     std::shared_ptr<FileFormat> format,
-                                     std::shared_ptr<fs::FileSystem> 
filesystem,
-                                     fs::PathForest forest,
-                                     ExpressionVector file_partitions)
+                                     
std::vector<std::shared_ptr<FileFragment>> fragments)
     : Dataset(std::move(schema), std::move(root_partition)),
-      format_(std::move(format)),
-      filesystem_(std::move(filesystem)),
-      forest_(std::move(forest)),
-      partitions_(std::move(file_partitions)) {
-  DCHECK_EQ(static_cast<size_t>(forest_.size()), partitions_.size());
-}
-
-Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Make(
-    std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition,
-    std::shared_ptr<FileFormat> format, std::shared_ptr<fs::FileSystem> 
filesystem,
-    std::vector<fs::FileInfo> infos) {
-  ExpressionVector partitions(infos.size(), scalar(true));
-  return Make(std::move(schema), std::move(root_partition), std::move(format),
-              std::move(filesystem), std::move(infos), std::move(partitions));
-}
+      fragments_(std::move(fragments)) {}
 
 Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Make(
     std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition,
-    std::shared_ptr<FileFormat> format, std::shared_ptr<fs::FileSystem> 
filesystem,
-    std::vector<fs::FileInfo> infos, ExpressionVector partitions) {
-  ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::Make(std::move(infos), 
&partitions));
-  return Make(std::move(schema), std::move(root_partition), std::move(format),
-              std::move(filesystem), std::move(forest), std::move(partitions));
-}
+    FragmentVector fragments) {
+  std::vector<std::shared_ptr<FileFragment>> file_fragments;
+  for (const auto& fragment : fragments) {
+    auto file_fragment = 
internal::checked_pointer_cast<FileFragment>(fragment);

Review comment:
       This will crash/UB if any is not a FileFragment; we should check 
fragment->type_name()

##########
File path: cpp/src/arrow/dataset/discovery.h
##########
@@ -220,16 +220,20 @@ class ARROW_DS_EXPORT FileSystemDatasetFactory : public 
DatasetFactory {
   Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) override;
 
  protected:
-  FileSystemDatasetFactory(std::shared_ptr<fs::FileSystem> filesystem,
-                           fs::PathForest forest, std::shared_ptr<FileFormat> 
format,
+  FileSystemDatasetFactory(std::vector<std::string> paths,
+                           std::shared_ptr<fs::FileSystem> filesystem,
+                           std::shared_ptr<FileFormat> format,
                            FileSystemFactoryOptions options);
 
   Result<std::shared_ptr<Schema>> PartitionSchema();
 
+  std::vector<std::string> paths_;
   std::shared_ptr<fs::FileSystem> fs_;
-  fs::PathForest forest_;
   std::shared_ptr<FileFormat> format_;
   FileSystemFactoryOptions options_;
+
+ private:
+  util::optional<util::string_view> BaselessPath(util::string_view path);

Review comment:
       ```suggestion
     util::optional<util::string_view> RemovePartitionBaseDir(util::string_view 
path);
   ```

##########
File path: cpp/src/arrow/dataset/discovery.cc
##########
@@ -246,32 +235,25 @@ Result<std::shared_ptr<Dataset>> 
FileSystemDatasetFactory::Finish(FinishOptions
     }
   }
 
-  ExpressionVector partitions(forest_.size(), scalar(true));
   std::shared_ptr<Partitioning> partitioning = 
options_.partitioning.partitioning();
   if (partitioning == nullptr) {
     auto factory = options_.partitioning.factory();
     ARROW_ASSIGN_OR_RAISE(partitioning, factory->Finish(schema));
   }
 
-  // apply partitioning to forest to derive partitions
-  auto apply_partitioning = [&](fs::PathForest::Ref ref) {
-    if (auto relative = 
fs::internal::RemoveAncestor(options_.partition_base_dir,
-                                                     ref.info().path())) {
-      auto segments = fs::internal::SplitAbstractPath(relative->to_string());
-
-      if (segments.size() > 0) {
-        auto segment_index = static_cast<int>(segments.size()) - 1;
-        auto maybe_partition = partitioning->Parse(segments.back(), 
segment_index);
-
-        partitions[ref.i] = std::move(maybe_partition).ValueOr(scalar(true));
-      }
+  FragmentVector fragments;
+  for (const auto& path : paths_) {
+    std::shared_ptr<Expression> partition = scalar(true);
+    if (auto relative = BaselessPath(path)) {
+      std::string path_string{*relative};
+      partition = partitioning->Parse(path_string).ValueOr(scalar(true));

Review comment:
       ```suggestion
         partition = 
partitioning->Parse(relative->to_string()).ValueOr(scalar(true));
   ```

##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -83,131 +83,67 @@ Result<ScanTaskIterator> 
FileFragment::Scan(std::shared_ptr<ScanOptions> options
 
 FileSystemDataset::FileSystemDataset(std::shared_ptr<Schema> schema,
                                      std::shared_ptr<Expression> 
root_partition,
-                                     std::shared_ptr<FileFormat> format,
-                                     std::shared_ptr<fs::FileSystem> 
filesystem,
-                                     fs::PathForest forest,
-                                     ExpressionVector file_partitions)
+                                     
std::vector<std::shared_ptr<FileFragment>> fragments)
     : Dataset(std::move(schema), std::move(root_partition)),
-      format_(std::move(format)),
-      filesystem_(std::move(filesystem)),
-      forest_(std::move(forest)),
-      partitions_(std::move(file_partitions)) {
-  DCHECK_EQ(static_cast<size_t>(forest_.size()), partitions_.size());
-}
-
-Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Make(
-    std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition,
-    std::shared_ptr<FileFormat> format, std::shared_ptr<fs::FileSystem> 
filesystem,
-    std::vector<fs::FileInfo> infos) {
-  ExpressionVector partitions(infos.size(), scalar(true));
-  return Make(std::move(schema), std::move(root_partition), std::move(format),
-              std::move(filesystem), std::move(infos), std::move(partitions));
-}
+      fragments_(std::move(fragments)) {}
 
 Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Make(
     std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition,
-    std::shared_ptr<FileFormat> format, std::shared_ptr<fs::FileSystem> 
filesystem,
-    std::vector<fs::FileInfo> infos, ExpressionVector partitions) {
-  ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::Make(std::move(infos), 
&partitions));
-  return Make(std::move(schema), std::move(root_partition), std::move(format),
-              std::move(filesystem), std::move(forest), std::move(partitions));
-}
+    FragmentVector fragments) {
+  std::vector<std::shared_ptr<FileFragment>> file_fragments;
+  for (const auto& fragment : fragments) {
+    auto file_fragment = 
internal::checked_pointer_cast<FileFragment>(fragment);

Review comment:
       I'd object more to the fragile type erasure of the `fragments` parameter 
but we'll be following up on this for 
https://issues.apache.org/jira/browse/ARROW-8163

##########
File path: cpp/src/arrow/dataset/filter.h
##########
@@ -188,6 +188,10 @@ class ARROW_DS_EXPORT Expression {
   /// This is a shortcut to check if the expression is neither null nor false.
   bool IsSatisfiable() const { return !IsNull() && !Equals(false); }
 
+  bool IsSatisfiableWith(const std::shared_ptr<Expression> other) const {

Review comment:
       ```suggestion
     bool IsSatisfiableWith(const std::shared_ptr<Expression>& other) const {
   ```

##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -83,131 +83,67 @@ Result<ScanTaskIterator> 
FileFragment::Scan(std::shared_ptr<ScanOptions> options
 
 FileSystemDataset::FileSystemDataset(std::shared_ptr<Schema> schema,
                                      std::shared_ptr<Expression> 
root_partition,
-                                     std::shared_ptr<FileFormat> format,
-                                     std::shared_ptr<fs::FileSystem> 
filesystem,
-                                     fs::PathForest forest,
-                                     ExpressionVector file_partitions)
+                                     
std::vector<std::shared_ptr<FileFragment>> fragments)
     : Dataset(std::move(schema), std::move(root_partition)),
-      format_(std::move(format)),
-      filesystem_(std::move(filesystem)),
-      forest_(std::move(forest)),
-      partitions_(std::move(file_partitions)) {
-  DCHECK_EQ(static_cast<size_t>(forest_.size()), partitions_.size());
-}
-
-Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Make(
-    std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition,
-    std::shared_ptr<FileFormat> format, std::shared_ptr<fs::FileSystem> 
filesystem,
-    std::vector<fs::FileInfo> infos) {
-  ExpressionVector partitions(infos.size(), scalar(true));
-  return Make(std::move(schema), std::move(root_partition), std::move(format),
-              std::move(filesystem), std::move(infos), std::move(partitions));
-}
+      fragments_(std::move(fragments)) {}
 
 Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Make(
     std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition,
-    std::shared_ptr<FileFormat> format, std::shared_ptr<fs::FileSystem> 
filesystem,
-    std::vector<fs::FileInfo> infos, ExpressionVector partitions) {
-  ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::Make(std::move(infos), 
&partitions));
-  return Make(std::move(schema), std::move(root_partition), std::move(format),
-              std::move(filesystem), std::move(forest), std::move(partitions));
-}
+    FragmentVector fragments) {
+  std::vector<std::shared_ptr<FileFragment>> file_fragments;
+  for (const auto& fragment : fragments) {
+    auto file_fragment = 
internal::checked_pointer_cast<FileFragment>(fragment);
+    file_fragments.push_back(std::move(file_fragment));
+  }
 
-Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Make(
-    std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition,
-    std::shared_ptr<FileFormat> format, std::shared_ptr<fs::FileSystem> 
filesystem,
-    fs::PathForest forest, ExpressionVector partitions) {
   return std::shared_ptr<FileSystemDataset>(new FileSystemDataset(
-      std::move(schema), std::move(root_partition), std::move(format),
-      std::move(filesystem), std::move(forest), std::move(partitions)));
+      std::move(schema), std::move(root_partition), 
std::move(file_fragments)));
 }
 
 Result<std::shared_ptr<Dataset>> FileSystemDataset::ReplaceSchema(
     std::shared_ptr<Schema> schema) const {
   RETURN_NOT_OK(CheckProjectable(*schema_, *schema));
   return std::shared_ptr<Dataset>(
-      new FileSystemDataset(std::move(schema), partition_expression_, format_,
-                            filesystem_, forest_, partitions_));
+      new FileSystemDataset(std::move(schema), partition_expression_, 
fragments_));
 }
 
 std::vector<std::string> FileSystemDataset::files() const {
   std::vector<std::string> files;
 
-  DCHECK_OK(forest_.Visit([&](fs::PathForest::Ref ref) {
-    if (ref.info().IsFile()) {
-      files.push_back(ref.info().path());
-    }
-    return Status::OK();
-  }));
+  for (const auto& fragment : fragments_) {
+    files.push_back(fragment->source().path());
+  }
 
   return files;
 }
 
 std::string FileSystemDataset::ToString() const {
   std::string repr = "FileSystemDataset:";
 
-  if (forest_.size() == 0) {
+  if (fragments_.empty()) {
     return repr + " []";
   }
 
-  DCHECK_OK(forest_.Visit([&](fs::PathForest::Ref ref) {
-    repr += "\n" + ref.info().path();
+  for (const auto& fragment : fragments_) {
+    repr += "\n" + fragment->source().path();
 
-    if (!partitions_[ref.i]->Equals(true)) {
-      repr += ": " + partitions_[ref.i]->ToString();
+    const auto& partition = fragment->partition_expression();
+    if (!partition->Equals(true)) {
+      repr += ": " + partition->ToString();
     }
-
-    return Status::OK();
-  }));
+  }
 
   return repr;
 }
 
-std::shared_ptr<Expression> FoldingAnd(const std::shared_ptr<Expression>& l,
-                                       const std::shared_ptr<Expression>& r) {
-  if (l->Equals(true)) return r;
-  if (r->Equals(true)) return l;
-  return and_(l, r);
-}
-
 FragmentIterator FileSystemDataset::GetFragmentsImpl(
     std::shared_ptr<Expression> predicate) {
   FragmentVector fragments;
 
-  ExpressionVector fragment_partitions(forest_.size());
-
-  auto collect_fragments = [&](fs::PathForest::Ref ref) -> 
fs::PathForest::MaybePrune {
-    auto partition = partitions_[ref.i];
-
-    // if available, copy parent's filter and projector
-    // (which are appropriately simplified and loaded with default values)
-    if (auto parent = ref.parent()) {
-      fragment_partitions[ref.i] = FoldingAnd(fragment_partitions[parent.i], 
partition);
-    } else {
-      fragment_partitions[ref.i] = FoldingAnd(partition_expression_, 
partition);
+  for (const auto& fragment : fragments_) {

Review comment:
       Follow up for sub tree pruning 
https://issues.apache.org/jira/browse/ARROW-8658

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -290,21 +292,23 @@ cdef class FileSystemDataset(Dataset):
                     "got {2})".format(name, class_.__name__, type(arg))
                 )
 
-        for info in filesystem.get_file_info(paths_or_selector):
-            c_file_infos.push_back(info.unwrap())
+        infos = filesystem.get_file_info(paths_or_selector)
 
         if partitions is None:
-            partitions = [
-                ScalarExpression(True) for _ in range(c_file_infos.size())]
-        for expr in partitions:
-            c_partitions.push_back(expr.unwrap())
+            partitions = [ScalarExpression(True) for _ in range(len(infos))]

Review comment:
       ```suggestion
               partitions = [ScalarExpression(True)] * len(infos)
   ```

##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -221,42 +157,34 @@ Result<std::shared_ptr<FileSystemDataset>> 
FileSystemDataset::Write(
     filesystem = std::make_shared<fs::LocalFileSystem>();
   }
 
-  std::vector<fs::FileInfo> files(plan.paths.size());
-  ExpressionVector partition_expressions(plan.paths.size(), scalar(true));
   auto task_group = scan_context->TaskGroup();
-
   auto partition_base_dir = 
fs::internal::EnsureTrailingSlash(plan.partition_base_dir);
   auto extension = "." + plan.format->type_name();
 
+  FragmentVector fragments;
   for (size_t i = 0; i < plan.paths.size(); ++i) {
     const auto& op = plan.fragment_or_partition_expressions[i];
-    if (util::holds_alternative<std::shared_ptr<Expression>>(op)) {
-      files[i].set_type(fs::FileType::Directory);
-      files[i].set_path(partition_base_dir + plan.paths[i]);
+    if (util::holds_alternative<std::shared_ptr<Fragment>>(op)) {

Review comment:
       since we're no longer doing anything with directories, these should be 
removed from `WritePlan::fragment_or_partition_expression`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to