I am using Apache Arrow to transfer data between Spark(scala) and
Tensorflow(python).
In Spark, I got an Array[Row], and I send the Array[Row] by socket through the
following code:
```
val rows_array: Array[Row] = df.rdd.collect()
val ss = ServerSocket = new ServerSocket(port)
val iter = rows_array.iterator
// Grouped a iterator into batches, batchIter is a Iterator[Iterator[Row]]
val batchIter = new BatchIterator(iter, arrowRecordBatchSize)
val root = new VectorSchemaRoot(new Schema(fields), vectors,
arrowRecordBatchSize)
val out = new DataOutputStream(socket.getOutputStream)
val writer = new ArrowStreamWriter(root, null, out)
writer.start()
sendRecordIterator(root, arrowStreamWriter, batchIter)
```
sendRecordIterator is defined as follows:
```
def sendRecordIterator(root: VectorSchemaRoot,
writer: ArrowStreamWriter,
inputIterator: Iterator[Iterator[Row]]): Unit = {
//The Iterator split data to batch
try {
val arrowWriter = MyArrowWriter.create(root)
while (inputIterator.hasNext) {
val nextBatch = inputIterator.next()
while (nextBatch.hasNext) {
arrowWriter.write(nextBatch.next())
}
arrowWriter.finish()
writer.writeBatch()
arrowWriter.reset()
}
writer.end()
} finally {
root.close()
}
}
```
In python, I get a ds by the following code:
```
ds = arrow_io.ArrowStreamDataset(
[endpoint],
columns=my_columns,
output_types=my_types,
output_shapes=my_shapes,
batch_size=arrowRecordBatchSize,
batch_omde='keep_remainder')
```
Then I converted the ds to numpy and traversed it:
```
ds_numpy = tfds.as_numpy(ds)
my_list = list()
for x in ds_numpy:
my_list.append(x)
```
I set different batch_size(arrowRecordBatchSize) to send the data, the results
show that there are different transmission efficiencies:
> arrowRecordBatchSize = 1,000(about 8kB) =======>161s
> arrowRecordBatchSize = 50,000(about 400kB) =======>63s
> arrowRecordBatchSize = 100,000(about 800kB) =======>65s
> arrowRecordBatchSize = 1,000,000(about 8MB) =======>108s
> arrowRecordBatchSize = 10,000,000(about 80MB) =======>113s
> arrowRecordBatchSize = 50,000,000(about 400MB) =======>112s
So, how does batch size affect streaming read and write performance in Apache
Arrow? Are there any best practices for batch size?
ps: I noticed that the sender seems to be using nio. Is it the key to this
phenomenon?
Pps: I asked the same question on stackOverflow, maybe it can read the code
better:
https://stackoverflow.com/questions/61339374/how-does-batch-size-affect-streaming-read-and-write-performance-in-apache-arrow
Best regards,
maqy