GitHub user aavbsouza added a comment to the discussion: It is possible to
reduce peak memory usage when using datasets (to use predicate pushdown) when
reading single parquet files
Hello @adamreeve , thanks for the inputs. I have tried to change and use the
`ParquetFragmentScanOptions` as shown on this snippet:
```cpp
arrow::Result<std::shared_ptr<arrow::RecordBatchReader>>
ReadFilteredParquetRecordReader(
const std::string &parquet_path, const std::int64_t start, const
std::int64_t end)
{
// Create a local filesystem
ARROW_ASSIGN_OR_RAISE(auto filesystem, fs::FileSystemFromUri("file:///"));
// Set up file selector for a single file
fs::FileInfo file_info(parquet_path, fs::FileType::File);
// Create a Parquet file format
auto format = std::make_shared<ds::ParquetFileFormat>();
auto parquet_scan_options =
std::make_shared<arrow::dataset::ParquetFragmentScanOptions>();
// Configure general Parquet reader settings
auto reader_properties =
std::make_shared<parquet::ReaderProperties>(arrow::default_memory_pool());
reader_properties->set_buffer_size(64*1024*1024); // 64 MB buffer size
reader_properties->enable_buffered_stream();
// Configure Arrow-specific Parquet reader settings
auto arrow_reader_props = std::make_shared<parquet::ArrowReaderProperties>();
arrow_reader_props->set_batch_size(10000); // default 64 * 1024// Configure
general Parquet reader settings
arrow_reader_props->set_use_threads(true); // Enable multithreading
arrow_reader_props->set_pre_buffer(false); // Enable pre-buffering
parquet_scan_options->reader_properties = reader_properties;
parquet_scan_options->arrow_reader_properties = arrow_reader_props;
auto scan_options = std::make_shared<arrow::dataset::ScanOptions>();
arrow::dataset::FileSystemFactoryOptions options;
ARROW_ASSIGN_OR_RAISE(auto factory,
arrow::dataset::FileSystemDatasetFactory::Make(
filesystem, {file_info}, format,
options));
ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
// Create a scanner builder
ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
ARROW_RETURN_NOT_OK(scan_builder->FragmentScanOptions(parquet_scan_options));
// Set a filter: e.g., column start <= "INDEX" <= end
auto filter = cp::and_(cp::greater_equal(cp::field_ref("INDEX"),
cp::literal(start)),
cp::less_equal(cp::field_ref("INDEX"),
cp::literal(end)));
ARROW_RETURN_NOT_OK(scan_builder->Filter(filter));
// Specify the columns to read, e.g., "DATA"
ARROW_RETURN_NOT_OK(scan_builder->Project({"DATA"}));
// Finish the scanner
ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
scanner->options()->use_threads = true; // Enable multithreading for
better performance
scanner->options()->cache_metadata = false; // Enable async reading
// scanner->options()->batch_readahead = 10; // Set batch readahead for
performance
// scanner->options()->fragment_readahead = 5; // Set fragment readahead for
performance
// Read the filtered table
return scanner->ToRecordBatchReader();
}
```
But the memory usage reduction was less pronounced than I was expecting.
I was able to use the parquet stats, to determine the row groups that I need
and reading sequentially these row groups controlled the memory usage, but it
was slower than using the dataset API.
GitHub link:
https://github.com/apache/arrow/discussions/47003#discussioncomment-13714316
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]