[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

2020-05-26 Thread GitBox


jorisvandenbossche commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r430367401



##
File path: cpp/src/arrow/dataset/file_parquet.cc
##
@@ -380,77 +317,316 @@ Result ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline bool RowGroupInfosAreComplete(const std::vector& 
infos) {
+  return !infos.empty() &&
+ std::all_of(infos.cbegin(), infos.cend(),
+ [](const RowGroupInfo& i) { return i.HasStatistics(); });
+}
+
+static inline std::vector FilterRowGroups(
+std::vector row_groups, const Expression& predicate) {
+  auto filter = [](const RowGroupInfo& info) {
+return !info.Satisfy(predicate);
+  };
+  auto end = std::remove_if(row_groups.begin(), row_groups.end(), filter);
+  row_groups.erase(end, row_groups.end());
+  return row_groups;
+}
+
+static inline Result> AugmentRowGroups(
+std::vector row_groups, parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  if (row_groups.empty()) {
+row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+if (!info.HasStatistics() && info.id() < num_row_groups) {
+  auto row_group = metadata->RowGroup(info.id());
+  info.set_num_rows(row_group->num_rows());
+  info.set_statistics(RowGroupStatisticsAsExpression(*row_group, 
manifest));
+}
+  };
+  std::for_each(row_groups.begin(), row_groups.end(), augment);
+
+  return row_groups;
+}
+
 Result ParquetFileFormat::ScanFile(
 const FileSource& source, std::shared_ptr options,
-std::shared_ptr context, const std::vector& row_groups) 
const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, 
std::move(properties)));
+std::shared_ptr context, std::vector 
row_groups) const {
+  bool row_groups_are_complete = RowGroupInfosAreComplete(row_groups);
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (row_groups_are_complete) {
+// Apply a pre-filtering if the user requested an explicit sub-set of
+// row-groups. In the case where a RowGroup doesn't have statistics
+// metdata, it will not be excluded.
+row_groups = FilterRowGroups(std::move(row_groups), *options->filter);
+if (row_groups.empty()) {
+  return MakeEmptyIterator>();
+}
+  }
+
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), 
context.get()));
 
-  for (int i : row_groups) {
-if (i >= reader->metadata()->num_row_groups()) {
-  return Status::IndexError("trying to scan row group ", i, " but ", 
source.path(),
-" only has ", 
reader->metadata()->num_row_groups(),
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+if (row_group.id() >= num_row_groups) {
+  return Status::IndexError("Trying to scan row group ", row_group.id(), " 
but ",
+source.path(), " only has ", num_row_groups,
 " row groups");
 }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, 
options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-   std::move(reader), 
std::move(arrow_properties),
-   row_groups);
+  if (!row_groups_are_complete) {
+ARROW_ASSIGN_OR_RAISE(row_groups,
+  AugmentRowGroups(std::move(row_groups), 
reader.get()));
+row_groups = FilterRowGroups(std::move(row_groups), *options->filter);
+  }
+
+  if (row_groups.empty()) {
+return MakeEmptyIterator>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), 
source,
+   std::move(reader), 
std::move(row_groups));
 }
 
 Result> ParquetFileFormat::MakeFragment(
 FileSource source, std::shared_ptr partition_expression,
-std::vector row_groups) {
+std::vector row_groups) {
   return std::shared_ptr(
   new ParquetFileFragment(std::move(source), shared_from_this(),
   std::move(partition_expression), 
std::move(row_groups)));
 }
 
+Result> ParquetFileFormat::MakeFragment(
+FileSource source, std::shared_ptr partition_expression,
+std::vector row_groups) {
+  return std::shared_ptr(new ParquetFileFragment(
+  std::move(source), shared_from_this(), std::move(partition_expression),
+  

[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

2020-05-23 Thread GitBox


jorisvandenbossche commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r429523732



##
File path: python/pyarrow/tests/test_dataset.py
##
@@ -1468,6 +1468,7 @@ def test_parquet_dataset_factory(tempdir):
 root_path = tempdir / "test_parquet_dataset"
 metadata_path, table = _create_parquet_dataset_simple(root_path)
 dataset = ds.parquet_dataset(metadata_path)
+print(root_path.rglob("*"))

Review comment:
   ```suggestion
   print(list(root_path.rglob("*")))
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

2020-05-20 Thread GitBox


jorisvandenbossche commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r428026117



##
File path: cpp/src/arrow/dataset/file_parquet.cc
##
@@ -380,77 +316,297 @@ Result ParquetFileFormat::ScanFile(
   return ScanFile(source, std::move(options), std::move(context), {});
 }
 
+static inline std::vector FilterRowGroups(
+std::vector row_groups, const Expression& predicate) {
+  // Keep the index of the last valid entry.
+  size_t idx = 0;
+  for (size_t i = 0; i < row_groups.size(); i++) {
+const auto& info = row_groups[i];
+if (info.Satisfy(predicate)) {
+  row_groups[idx++] = info;
+}
+  }
+  row_groups.resize(idx);
+  return row_groups;
+}
+
+static inline Result> AugmentAndFilter(
+std::vector row_groups, const Expression& predicate,
+parquet::arrow::FileReader* reader) {
+  auto metadata = reader->parquet_reader()->metadata();
+  auto manifest = reader->manifest();
+  auto num_row_groups = metadata->num_row_groups();
+
+  // Augment a RowGroup with statistics if missing.
+  auto augment = [&](RowGroupInfo& info) {
+auto id = info.id();
+if (!info.HasStatistics() && id < num_row_groups) {
+  auto row_group = metadata->RowGroup(info.id());
+  info.set_num_rows(row_group->num_rows());
+  info.set_statistics(RowGroupStatisticsAsExpression(*row_group, 
manifest));
+}
+  };
+
+  if (row_groups.empty()) {
+row_groups = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (auto& row_group : row_groups) {
+augment(row_group);
+  }
+
+  return FilterRowGroups(std::move(row_groups), predicate);
+}
+
 Result ParquetFileFormat::ScanFile(
 const FileSource& source, std::shared_ptr options,
-std::shared_ptr context, const std::vector& row_groups) 
const {
-  auto properties = MakeReaderProperties(*this, context->pool);
-  ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, 
std::move(properties)));
+std::shared_ptr context, std::vector 
row_groups) const {
+  // The following block is required to avoid any IO if all RowGroups are
+  // excluded due to prior statistics knowledge.
+  if (!row_groups.empty()) {
+// Apply a pre-filtering if the user requested an explicit sub-set of
+// row-groups. In the case where a RowGroup doesn't have statistics
+// metdata, it will not be excluded.
+row_groups = FilterRowGroups(row_groups, *options->filter);
+if (row_groups.empty()) {
+  return MakeEmptyIterator>();
+}
+  }
 
-  for (int i : row_groups) {
-if (i >= reader->metadata()->num_row_groups()) {
-  return Status::IndexError("trying to scan row group ", i, " but ", 
source.path(),
-" only has ", 
reader->metadata()->num_row_groups(),
+  // Open the reader and pay the real IO cost.
+  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), 
context.get()));
+
+  // Ensure RowGroups are indexing valid RowGroups before augmenting.
+  auto num_row_groups = reader->num_row_groups();
+  for (const auto& row_group : row_groups) {
+if (row_group.id() >= num_row_groups) {
+  return Status::IndexError("Trying to scan row group ", row_group.id(), " 
but ",
+source.path(), " only has ", num_row_groups,
 " row groups");
 }
   }
 
-  auto arrow_properties = MakeArrowReaderProperties(*this, 
options->batch_size, *reader);
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
-   std::move(reader), 
std::move(arrow_properties),
-   row_groups);
+  ARROW_ASSIGN_OR_RAISE(row_groups,
+AugmentAndFilter(row_groups, *options->filter, 
reader.get()));
+
+  if (row_groups.empty()) {
+return MakeEmptyIterator>();
+  }
+
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), 
source,
+   std::move(reader), 
std::move(row_groups));
 }
 
 Result> ParquetFileFormat::MakeFragment(
 FileSource source, std::shared_ptr partition_expression,
-std::vector row_groups) {
+std::vector row_groups) {
   return std::shared_ptr(
   new ParquetFileFragment(std::move(source), shared_from_this(),
   std::move(partition_expression), 
std::move(row_groups)));
 }
 
+Result> ParquetFileFormat::MakeFragment(
+FileSource source, std::shared_ptr partition_expression,
+std::vector row_groups) {
+  return std::shared_ptr(new ParquetFileFragment(
+  std::move(source), shared_from_this(), std::move(partition_expression),
+  RowGroupInfo::FromIdentifiers(row_groups)));
+}
+
 Result> ParquetFileFormat::MakeFragment(
 FileSource source, std::shared_ptr partition_expression) {
   return std::shared_ptr(new ParquetFileFragment(
   std::move(source), shared_from_this(), std::move(partition_expression), 
{}));
 }
 
-Result 

[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #7180: ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory

2020-05-19 Thread GitBox


jorisvandenbossche commented on a change in pull request #7180:
URL: https://github.com/apache/arrow/pull/7180#discussion_r427163007



##
File path: cpp/src/arrow/dataset/file_parquet.h
##
@@ -97,53 +103,167 @@ class ARROW_DS_EXPORT ParquetFileFormat : public 
FileFormat {
   Result ScanFile(const FileSource& source,
 std::shared_ptr options,
 std::shared_ptr context,
-const std::vector& row_groups) const;
+std::vector row_groups) 
const;
 
   using FileFormat::MakeFragment;
 
+  /// \brief Create a Fragment, restricted to the specified row groups.
   Result> MakeFragment(
-  FileSource source, std::shared_ptr partition_expression) 
override;
+  FileSource source, std::shared_ptr partition_expression,
+  std::vector row_groups);
 
-  /// \brief Create a Fragment, restricted to the specified row groups.
   Result> MakeFragment(
   FileSource source, std::shared_ptr partition_expression,
   std::vector row_groups);
 
-  /// \brief Split a ParquetFileFragment into a Fragment for each row group.
+  /// \brief Create a Fragment targeting all RowGroups.
+  Result> MakeFragment(
+  FileSource source, std::shared_ptr partition_expression) 
override;
+
+  /// \brief Return a FileReader on the given source.
+  Result> GetReader(
+  const FileSource& source, ScanOptions* = NULLPTR, ScanContext* = 
NULLPTR) const;
+};
+
+/// \brief Represents a parquet's RowGroup with extra information.
+class ARROW_DS_EXPORT RowGroupInfo : public 
util::EqualityComparable {
+ public:
+  RowGroupInfo() : RowGroupInfo(-1) {}
+
+  /// \brief Construct a RowGroup from an identifier.
+  explicit RowGroupInfo(int id) : RowGroupInfo(id, -1, NULLPTR) {}
+
+  /// \brief Construct a RowGroup from an identifier with statistics.
+  RowGroupInfo(int id, int64_t num_rows, std::shared_ptr 
statistics)
+  : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {}
+
+  /// \brief Transform a vector of identifiers into a vector of RowGroupInfos
+  static std::vector FromIdentifiers(const std::vector ids);
+  static std::vector FromCount(int count);
+
+  /// \brief Return the RowGroup's identifier (index in the file).
+  int id() const { return id_; }
+
+  /// \brief Return the RowGroup's number of rows.
   ///
-  /// \param[in] fragment to split
-  /// \param[in] filter expression that will ignore RowGroup that can't satisfy
-  ///the filter.
+  /// If statistics are not provided, return 0.
+  int64_t num_rows() const { return num_rows_; }
+  void set_num_rows(int64_t num_rows) { num_rows_ = num_rows; }
+
+  /// \brief Return the RowGroup's statistics
+  const std::shared_ptr& statistics() const { return statistics_; }
+  void set_statistics(std::shared_ptr statistics) {
+statistics_ = std::move(statistics);
+  }
+
+  /// \brief Indicate if statistics are set.
+  bool HasStatistics() const { return statistics_ != NULLPTR; }
+
+  /// \brief Indicate if the RowGroup's statistics satisfy the predicate.
   ///
-  /// \return An iterator of fragment.
-  Result GetRowGroupFragments(
-  const ParquetFileFragment& fragment,
-  std::shared_ptr filter = scalar(true));
+  /// If the RowGroup was not initialized with statistics, it is deemd

Review comment:
   missing word at the end?

##
File path: python/pyarrow/dataset.py
##
@@ -443,6 +445,53 @@ def _union_dataset(children, schema=None, **kwargs):
 return UnionDataset(schema, children)
 
 
+def parquet_dataset(metadata_path, schema=None, filesystem=None, format=None):
+"""
+Create a FileSystemDataset from a `_metadata` file created via
+`pyarrrow.parquet.write_metadata`.
+
+Parameters
+--
+metadata_path : path,
+Path pointing to a single file parquet metadata file
+schema : Schema, optional
+Optionally provide the Schema for the Dataset, in which case it will
+not be inferred from the source.
+filesystem : FileSystem or URI string, default None
+If a single path is given as source and filesystem is None, then the
+filesystem will be inferred from the path.
+If an URI string is passed, then a filesystem object is constructed
+using the URI's optional path component as a directory prefix. See the
+examples below.
+Note that the URIs on Windows must follow 'file:///C:...' or
+'file:/C:...' patterns.
+format : ParquetFileFormat
+An instance of a ParquetFileFormat if special options needs to be
+passed.
+
+Returns
+---
+FileSystemDataset
+"""
+from pyarrow.fs import LocalFileSystem
+
+if not isinstance(metadata_path, str):
+raise ValueError("metadata_path argument must be a string")

Review comment:
   ```suggestion
   metadata_path = _stringify_path(metadata_path)
   ```
   
   (then