GitHub user moba15 edited a discussion: Arrow-Datasets C++: Scanner::Scan
visitor is executing serially despite use_threads=true
Hey together,
I am working with the Apache Arrow C++ Dataset API to scan multiple Parquet
files. My goal is to process RecordBatches in parallel using a callback
function without materializing the entire table at once.
Following the Dataset Tutorial, I am using the Scan method. According to the
documentation:
If multiple threads are used (via use_threads), the visitor will be invoked
from those threads and is responsible for any synchronization.
However, in my implementation, the visitor function is called strictly in
order—one call only begins after the previous one finishes—even though
`use_threads` is set to `true`. I have also tried `ScanBatchesUnordered`, but I
am seeing similar serial behavior.
Minimal Working Example:
```C++
#include <iostream>
#include <memory>
#include <arrow/api.h>
#include <arrow/compute/api.h>
#include <arrow/dataset/api.h>
#include <thread>
arrow::Status ProcessBatch(const arrow::dataset::TaggedRecordBatch
&tagged_batch) {
std::cerr << "ThreadId " << std::this_thread::get_id() << " got batch with "
<< tagged_batch.record_batch->num_rows() << " rows at "
<< std::chrono::system_clock::now() << "\n";
//Wait: simulate processing time
std::this_thread::sleep_for(std::chrono::seconds(5));
return arrow::Status::OK();
}
arrow::Status ScanWholeDataset(
const std::shared_ptr<arrow::fs::FileSystem> &filesystem,
const std::shared_ptr<arrow::dataset::FileFormat> &format, const
std::string &base_dir) {
// Create custom scan options
auto customOption = std::make_shared<arrow::dataset::ScanOptions>();
customOption->use_threads = true;
customOption->fragment_readahead = 20;
arrow::fs::FileSelector selector;
selector.base_dir = base_dir;
selector.recursive = true;
ARROW_ASSIGN_OR_RAISE(
auto factory,
arrow::dataset::FileSystemDatasetFactory::Make(filesystem, selector,
format, arrow::dataset::
FileSystemFactoryOptions()));
ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
arrow::dataset::ScannerBuilder scan_builder(dataset, customOption);
ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder.Finish());
//Call Scan method with callback function
scanner->Scan(ProcessBatch);
return arrow::Status::OK();
}
arrow::Status Test() {
ARROW_RETURN_NOT_OK(arrow::compute::Initialize());
std::string base_path = "xxx";
std::string root_path;
std::string uri = "file://xxx";
ARROW_ASSIGN_OR_RAISE(auto fs, arrow::fs::FileSystemFromUri(uri,
&root_path));
auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
ARROW_RETURN_NOT_OK(ScanWholeDataset(fs, format, base_path));
return arrow::Status::OK();
}
int main() {
auto status = Test();
if (!status.ok()) {
std::cerr << "Error: " << status.message() << std::endl;
return 1;
}
return 0;
}
```
Observed Behavior
When running this against 6 Parquet files (approx. 5 GiB total), the timestamps
in the output show a perfect 5-second gap between batches.
```
ThreadId 133083219080896 got batch with 122880 rows at 2026-03-20
09:25:04.164492317
ThreadId 133083219080896 got batch with 122880 rows at 2026-03-20
09:25:09.165123649
ThreadId 133083219080896 got batch with 122880 rows at 2026-03-20
09:25:14.167036139
```
Apache arrow 22
Am I missing a configuration step in ScanOptions or ScannerBuilder to actually
trigger parallel execution of the visitor? Is there a preferred way to handle
parallel callbacks in the Dataset API?
Thanks for your help
GitHub link: https://github.com/apache/arrow/discussions/49568
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]