Hi,
I am using this Java code to stream a row-based result from Dremio using Apache Arrow Flight:
FlightInfo flightInfo = client.getInfo(FlightDescriptor.command(query.getBytes(UTF_8)), bearerToken);
try (FlightStream stream = client.getStream(flightInfo.getEndpoints().get(0).getTicket(), bearerToken);
VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(stream.getSchema(), bufferAllocator);
) {
for (final VectorLoader vectorLoader = new VectorLoader(vectorSchemaRoot); stream.next(); ) {
if (! stream.hasRoot())
break;
try (ArrowRecordBatch currentRecordBatch = new VectorUnloader(stream.getRoot()).getRecordBatch()) {
processRows(stream.getRoot());
vectorLoader.load(currentRecordBatch);
}
}
}
The code structure is basically the same as in Dremio's Flight client examples:
https://github.com/dremio-hub/arrow-flight-client-examples/blob/8e54411ea57f98d30ec81ec62bacb9d129a2b23f/java/src/main/java/com/adhoc/flight/client/AdhocFlightClient.java#L349-L378
My problem is that the amount of direct memory that is allocated under the hood depends on the width of the rows. All batches but the last one have a fixed count of 3968 rows, but the wider the rows are, the more direct memory is needed. I am observing the allocation in java.nio.Bits.tryReserveMemory (from JDK 11.0.18):
private static boolean tryReserveMemory(long size, int cap) {
// -XX:MaxDirectMemorySize limits the total capacity rather than the
// actual memory usage, which will differ when buffers are page
// aligned.
long totalCap;
while (cap <= MAX_MEMORY - (totalCap = TOTAL_CAPACITY.get())) {
if (TOTAL_CAPACITY.compareAndSet(totalCap, totalCap + cap)) {
RESERVED_MEMORY.addAndGet(size);
COUNT.incrementAndGet();
return true;
}
}
return false;
}
where the value of MAX_MEMORY corresponds to the setting of -XX:MaxDirectMemorySize and RESERVED_MEMORY is going up and eventually exceeds MAX_MEMORY.
For a stream of 8KB rows, I need about 200MB of direct memory, and for 64KB rows I need more than 1GB of direct memory. So I am running out of direct memory fairly soon, especially if multiple such streams need be processed.
I guess that the memory requirement is some product of the batch size and the row width, so as batch sizes apparently are fixed it increases with wider rows.
Is there any way to get this working under memory constraints?
