Hi,
I am using Apache arrow in Java. I want to append stream of record batches to
an already existing file.
That means, I have a fileOutputStream. Using ArrowStreamWriter, I wrote some
batches to this fileOutputStream. Now I close the fileOutputStream and then
again start this stream to append some batches to it.
The issue is that metadata gets appended again, so the streamreader is not able
to read all the batches. So how Can I append stream of recordBatches to an
existing file correctly without repetition of metadata in between?
Below is part of Java code I am currently using:
public void setupWrite(String filename, boolean useCustom) throws Exception {
File arrowFile = validateFile(filename, false);
this.fileOutputStream = new FileOutputStream(arrowFile);
Schema schema = makeSchema();
this.root = VectorSchemaRoot.create(schema, this.ra);
DictionaryProvider.MapDictionaryProvider provider = new
DictionaryProvider.MapDictionaryProvider();
this.arrowStreamWriter=new
ArrowStreamWriter(root,provider,Channels.newChannel(this.fileOutputStream));
// Just to show some stuff about the schema and layout
System.out.println("Schema/Layout: ");
for (Field field : root.getSchema().getFields()) {
FieldVector vector = root.getVector(field.getName());
showFieldLayout(field, vector);
}
System.out.println("Generated " + this.entries + " data entries , batch
size " + batchSize + " usingCustomWriter: " + useCustom + " useNullValues " +
this.useNullValues);
// writing logic starts here
// arrowFileWriter.start();
arrowStreamWriter.start();
for(int i = 0; i < this.entries;) {
int toProcessItems = Math.min(this.batchSize, this.entries - i);
// set the batch row count
root.setRowCount(toProcessItems);
for (Field field : root.getSchema().getFields()) {
FieldVector vector = root.getVector(field.getName());
// System.out.println(vector.getMinorType());
switch (vector.getMinorType()) {
case INT:
writeFieldInt(vector, i, toProcessItems);
break;
case BIGINT:
writeFieldLong(vector, i, toProcessItems);
break;
case FLOAT4:
writeFieldFloat4(vector, i, toProcessItems);
break;
case VARCHAR:
writeFieldVarchar(vector, i, toProcessItems);
break;
case DATEDAY:
writeFieldDate(vector, i, toProcessItems);
break;
// case VARBINARY:
// writeFieldVarBinary(vector, i, toProcessItems);
// break;
default:
throw new Exception(" Not supported yet type: " +
vector.getMinorType());
}
}
// arrowFileWriter.writeBatch();
arrowStreamWriter.writeBatch();
i+=toProcessItems;
}
arrowStreamWriter.end();
arrowStreamWriter.close();
fileOutputStream.flush();
fileOutputStream.close();
this.fileOutputStream = new FileOutputStream(arrowFile,true);
this.arrowStreamWriter=new
ArrowStreamWriter(root,provider,this.fileOutputStream.getChannel());
arrowStreamWriter.start();
for(int i = 0; i < this.entries;) {
int toProcessItems = Math.min(this.batchSize, this.entries - i);
// set the batch row count
root.setRowCount(toProcessItems);
for (Field field : root.getSchema().getFields()) {
FieldVector vector = root.getVector(field.getName());
// System.out.println(vector.getMinorType());
switch (vector.getMinorType()) {
case INT:
writeFieldInt(vector, i, toProcessItems);
break;
case BIGINT:
writeFieldLong(vector, i, toProcessItems);
break;
case FLOAT4:
writeFieldFloat4(vector, i, toProcessItems);
break;
case VARCHAR:
writeFieldVarchar(vector, i, toProcessItems);
break;
case DATEDAY:
writeFieldDate(vector, i, toProcessItems);
break;
// case VARBINARY:
// writeFieldVarBinary(vector, i, toProcessItems);
// break;
default:
throw new Exception(" Not supported yet type: " +
vector.getMinorType());
}
arrowStreamWriter.writeBatch();
i+=toProcessItems;
}
arrowStreamWriter.end();
arrowStreamWriter.close();
fileOutputStream.flush();
fileOutputStream.close();
}
Thanks,
Priyanshu