GitHub user MukundaKatta added a comment to the discussion: Arrow-Datasets C++: 
Scanner::Scan visitor is executing serially despite use_threads=true

Expanding on @KOKOSde's answer with the specific shape of the API:

`Scanner::Scan(visitor)` is single-consumer by design — the visitor is 
serialised across batches regardless of `use_threads`. `use_threads` 
parallelizes everything *upstream* of your visitor (file open, decompression, 
decode), and the threads fan back into a single-consumer pipeline where your 
visitor sees one batch at a time. Your 5-second sleep back-pressures that 
pipeline, so the decode threads finish early and then idle waiting for your 
visitor to drain. That's why you're seeing "ThreadId X, ThreadId X, ThreadId X" 
— there's exactly one consumer thread calling into your visitor.

`ScanBatchesUnordered` is the same contract with order relaxed. Still 
single-consumer.

### To actually parallelize your processing, decouple

Two patterns work, pick per taste:

**1. Consume into a producer/consumer queue**

```cpp
#include <arrow/util/thread_pool.h>

auto pool = arrow::internal::GetCpuThreadPool();
auto scanner_reader = ARROW_RESULT(scanner->ToRecordBatchReader());

arrow::Status batch_st;
while (true) {
  std::shared_ptr<arrow::RecordBatch> batch;
  batch_st = scanner_reader->ReadNext(&batch);
  if (!batch_st.ok() || batch == nullptr) break;

  // Fan out the actual work onto the CPU pool.
  ARROW_RETURN_NOT_OK(pool->Spawn([batch] {
    // Your ProcessBatch body — runs on whichever pool thread is free.
    std::cerr << "ThreadId " << std::this_thread::get_id()
              << " processing " << batch->num_rows() << " rows\n";
    std::this_thread::sleep_for(std::chrono::seconds(5));
  }));
}

// Wait for all spawned tasks to complete before returning.
pool->WaitForIdle();
return batch_st;
```

Now the reader thread is the only one calling `ReadNext`, and the CPU pool runs 
up to `NumCores` visitor bodies in parallel. You can cap in-flight batches with 
a `Semaphore` if your processing allocates lots of memory.

**2. Use `ScanBatchesAsync` + `ApplyCPU`**

If you want everything inside the Arrow execution model:

```cpp
auto gen = ARROW_RESULT(scanner->ScanBatchesAsync(pool));

// VisitAsyncGenerator with a parallel transform:
arrow::Future<> done = arrow::VisitAsyncGenerator(
    std::move(gen),
    [pool](arrow::dataset::TaggedRecordBatch tb) -> arrow::Future<> {
      return arrow::DeferNotOk(pool->Submit([batch = 
std::move(tb.record_batch)] {
        // Your per-batch work.
        return arrow::Status::OK();
      }));
    });
done.Wait();
return done.status();
```

This keeps the back-pressure inside Arrow's future machinery; the async 
generator won't pull a new batch until a previous `Future` completes, so memory 
stays bounded even if processing is slow.

### The `io_context.use_threads` gotcha

One more knob that often gets missed: `ScanOptions::use_threads` controls 
batch-level parallelism, but I/O parallelism is governed by a separate I/O 
thread pool that defaults to 8 threads. If your Parquet files are on a slow or 
remote filesystem, even the decode path can bottleneck there. Check with:

```cpp
arrow::io::SetIOThreadPoolCapacity(16); // or tune up per your NVMe/S3 setup
```

### Which of these you should pick

- `sleep_for(5s)` in your repro is standing in for CPU work → **pattern 1**. 
Most code ends up here.
- Your processing is itself async (network calls, another CPU pool) → **pattern 
2**; the async future chain composes cleanly with your existing futures.
- Processing is fast and per-batch, you're just eating I/O latency → the 
current code is fine; speeding up `use_threads` + I/O pool (above) is all you 
need.

The key mental model: Arrow's Scanner gives you a stream, not a parallel apply. 
Parallel apply is your responsibility using one of the two patterns above.


GitHub link: 
https://github.com/apache/arrow/discussions/49568#discussioncomment-16658184

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to