GitHub user aavbsouza added a comment to the discussion: It is possible to 
reduce peak memory usage when using datasets (to use predicate pushdown) when 
reading single parquet files

Hello @adamreeve , thanks for the inputs. I have tried to change and use the 
`ParquetFragmentScanOptions` as shown on this snippet:

```cpp
arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> 
ReadFilteredParquetRecordReader(
    const std::string &parquet_path, const std::int64_t start, const 
std::int64_t end)
{
  // Create a local filesystem
  ARROW_ASSIGN_OR_RAISE(auto filesystem, fs::FileSystemFromUri("file:///"));

  // Set up file selector for a single file
  fs::FileInfo file_info(parquet_path, fs::FileType::File);

  // Create a Parquet file format
  auto format = std::make_shared<ds::ParquetFileFormat>();
  auto parquet_scan_options = 
std::make_shared<arrow::dataset::ParquetFragmentScanOptions>();
  // Configure general Parquet reader settings
  auto reader_properties = 
std::make_shared<parquet::ReaderProperties>(arrow::default_memory_pool());
  reader_properties->set_buffer_size(64*1024*1024);  // 64 MB buffer size
  reader_properties->enable_buffered_stream();

  // Configure Arrow-specific Parquet reader settings
  auto arrow_reader_props = std::make_shared<parquet::ArrowReaderProperties>();
  arrow_reader_props->set_batch_size(10000);  // default 64 * 1024// Configure 
general Parquet reader settings
  arrow_reader_props->set_use_threads(true);  // Enable multithreading
  arrow_reader_props->set_pre_buffer(false);  // Enable pre-buffering

  parquet_scan_options->reader_properties = reader_properties;
  parquet_scan_options->arrow_reader_properties = arrow_reader_props;

  auto scan_options = std::make_shared<arrow::dataset::ScanOptions>();

  arrow::dataset::FileSystemFactoryOptions options;
  ARROW_ASSIGN_OR_RAISE(auto factory, 
arrow::dataset::FileSystemDatasetFactory::Make(
                                          filesystem, {file_info}, format, 
options));

  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());

  // Create a scanner builder
  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
  ARROW_RETURN_NOT_OK(scan_builder->FragmentScanOptions(parquet_scan_options));

  // Set a filter: e.g., column start <= "INDEX" <= end
  auto filter = cp::and_(cp::greater_equal(cp::field_ref("INDEX"), 
cp::literal(start)),
                         cp::less_equal(cp::field_ref("INDEX"), 
cp::literal(end)));

  ARROW_RETURN_NOT_OK(scan_builder->Filter(filter));

  // Specify the columns to read, e.g., "DATA"
  ARROW_RETURN_NOT_OK(scan_builder->Project({"DATA"}));

  // Finish the scanner
  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());

  scanner->options()->use_threads = true;     // Enable multithreading for 
better performance
  scanner->options()->cache_metadata = false; // Enable async reading
    // scanner->options()->batch_readahead = 10;   // Set batch readahead for 
performance
  // scanner->options()->fragment_readahead = 5; // Set fragment readahead for 
performance
  // Read the filtered table
  return scanner->ToRecordBatchReader();
}
```
But the memory  usage reduction was less pronounced than I was expecting.

I was able to use the parquet stats, to determine the row groups that I need 
and reading sequentially these row groups controlled the memory usage, but it 
was slower than using the dataset API.

GitHub link: 
https://github.com/apache/arrow/discussions/47003#discussioncomment-13714316

----
This is an automatically sent email for user@arrow.apache.org.
To unsubscribe, please send an email to: user-unsubscr...@arrow.apache.org

Reply via email to