GitHub user MukundaKatta added a comment to the discussion: Arrow-Datasets C++:
Scanner::Scan visitor is executing serially despite use_threads=true
Expanding on @KOKOSde's answer with the specific shape of the API:
`Scanner::Scan(visitor)` is single-consumer by design — the visitor is
serialised across batches regardless of `use_threads`. `use_threads`
parallelizes everything *upstream* of your visitor (file open, decompression,
decode), and the threads fan back into a single-consumer pipeline where your
visitor sees one batch at a time. Your 5-second sleep back-pressures that
pipeline, so the decode threads finish early and then idle waiting for your
visitor to drain. That's why you're seeing "ThreadId X, ThreadId X, ThreadId X"
— there's exactly one consumer thread calling into your visitor.
`ScanBatchesUnordered` is the same contract with order relaxed. Still
single-consumer.
### To actually parallelize your processing, decouple
Two patterns work, pick per taste:
**1. Consume into a producer/consumer queue**
```cpp
#include <arrow/util/thread_pool.h>
auto pool = arrow::internal::GetCpuThreadPool();
auto scanner_reader = ARROW_RESULT(scanner->ToRecordBatchReader());
arrow::Status batch_st;
while (true) {
std::shared_ptr<arrow::RecordBatch> batch;
batch_st = scanner_reader->ReadNext(&batch);
if (!batch_st.ok() || batch == nullptr) break;
// Fan out the actual work onto the CPU pool.
ARROW_RETURN_NOT_OK(pool->Spawn([batch] {
// Your ProcessBatch body — runs on whichever pool thread is free.
std::cerr << "ThreadId " << std::this_thread::get_id()
<< " processing " << batch->num_rows() << " rows\n";
std::this_thread::sleep_for(std::chrono::seconds(5));
}));
}
// Wait for all spawned tasks to complete before returning.
pool->WaitForIdle();
return batch_st;
```
Now the reader thread is the only one calling `ReadNext`, and the CPU pool runs
up to `NumCores` visitor bodies in parallel. You can cap in-flight batches with
a `Semaphore` if your processing allocates lots of memory.
**2. Use `ScanBatchesAsync` + `ApplyCPU`**
If you want everything inside the Arrow execution model:
```cpp
auto gen = ARROW_RESULT(scanner->ScanBatchesAsync(pool));
// VisitAsyncGenerator with a parallel transform:
arrow::Future<> done = arrow::VisitAsyncGenerator(
std::move(gen),
[pool](arrow::dataset::TaggedRecordBatch tb) -> arrow::Future<> {
return arrow::DeferNotOk(pool->Submit([batch =
std::move(tb.record_batch)] {
// Your per-batch work.
return arrow::Status::OK();
}));
});
done.Wait();
return done.status();
```
This keeps the back-pressure inside Arrow's future machinery; the async
generator won't pull a new batch until a previous `Future` completes, so memory
stays bounded even if processing is slow.
### The `io_context.use_threads` gotcha
One more knob that often gets missed: `ScanOptions::use_threads` controls
batch-level parallelism, but I/O parallelism is governed by a separate I/O
thread pool that defaults to 8 threads. If your Parquet files are on a slow or
remote filesystem, even the decode path can bottleneck there. Check with:
```cpp
arrow::io::SetIOThreadPoolCapacity(16); // or tune up per your NVMe/S3 setup
```
### Which of these you should pick
- `sleep_for(5s)` in your repro is standing in for CPU work → **pattern 1**.
Most code ends up here.
- Your processing is itself async (network calls, another CPU pool) → **pattern
2**; the async future chain composes cleanly with your existing futures.
- Processing is fast and per-batch, you're just eating I/O latency → the
current code is fine; speeding up `use_threads` + I/O pool (above) is all you
need.
The key mental model: Arrow's Scanner gives you a stream, not a parallel apply.
Parallel apply is your responsibility using one of the two patterns above.
GitHub link:
https://github.com/apache/arrow/discussions/49568#discussioncomment-16658184
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]