Hi there,
I´m a newbie arrow user and I want to develop an application which loads/stores
information from/to arrow. For this purpose, I started a very simple Kotlin
application that first writes an arrow file and later reads the information
from that arrow file. It makes use of the Arrow Java API (See code below).
This application sets an schema with 3 fields: 1 Float, 1 Int and 1 Varchar.
Later, it writes 3 batches of ten elements (in Section Writer). In the last
section (called Reader), It tries to load the information that was stored in
the Arrow file.
The application works properly if I avoid the usage of dictionaries for storing
and loading Strings in Utf-8 format. However, If I use dictionaries I get an
exception (see below) while trying to load the first batch by using the
ArrowFileReader.loadRecordsBatch() function (Line annoted with //ERROR HERE
tag.).
Exception in thread "main" java.lang.IllegalArgumentException: not all nodes
and buffers were consumed. nodes: [] buffers: [ArrowBuf[52], udle: [34
448..458]]
at
org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:65)
at
org.apache.arrow.vector.ipc.ArrowReader.loadRecordBatch(ArrowReader.java:220)
at
org.apache.arrow.vector.ipc.ArrowFileReader.loadNextBatch(ArrowFileReader.java:114)
at
org.apache.arrow.vector.ipc.ArrowFileReader.loadRecordBatch(ArrowFileReader.java:139)
at
ArrowFileSymbolicalWithDictionaryKt.main(ArrowFileSymbolicalWithDictionary.kt:157)
I´m not sure if I´m doing something wrong or it is just an issue, could anyone
assist me with this issue?
Thanks in advance,
Antonio Vilches.
fun main(args: Array<String>) {
println("Writing and reading an apache arrow file")
val numRecords = (10 * 3) // 30 in total
val initialBatchSize = 10
val recordsPerBatch = initialBatchSize
val numBatches = if ((numRecords % recordsPerBatch) == 0){
numRecords / recordsPerBatch
}else{
numRecords / recordsPerBatch + 1
}
println("Reading and writing $numRecords records in $numBatches batches.")
//
*****************************************************************************************************************
// Generate dictionary
*
//
*****************************************************************************************************************
val dictionaryProvider = DictionaryProvider.MapDictionaryProvider()
val vector = newVarCharVector("Tags", RootAllocator(Long.MAX_VALUE))
vector.setInitialCapacity(4)
vector.allocateNew()
vector.set(0, "a".toByteArray(Charsets.UTF_8))
vector.set(1, "b".toByteArray(Charsets.UTF_8))
vector.set(2, "c".toByteArray(Charsets.UTF_8))
vector.set(3, "d".toByteArray(Charsets.UTF_8))
vector.valueCount = 4
val dictionary = Dictionary(vector,
DictionaryEncoding("Tags".hashCode().toLong(), false, null))
dictionaryProvider.put(dictionary)
//
*****************************************************************************************************************
// Create schema
*
//
*****************************************************************************************************************
val filePath = "./example.arrow"
//Create Fields
val temperature = Field("temperature",
FieldType.nullable(ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)),
null)
val humidity = Field("humidity", FieldType.nullable(ArrowType.Int(32,
true)), null)
val dic = dictionaryProvider.lookup("Tags".hashCode().toLong())
val tag = Field("tag", FieldType(true, ArrowType.Utf8(), /*null*/
dic.encoding, null), null)
//Create schema
val builder = mutableListOf<Field>()
builder.add(temperature)
builder.add(humidity)
builder.add(tag)
val schema = Schema(builder, null)
//
*****************************************************************************************************************
// Write to Arrow
*
//
*****************************************************************************************************************
val fileToWrite = File(filePath)
val writeStream = fileToWrite.outputStream()
val schemaRoot = VectorSchemaRoot.create(schema,
RootAllocator(Long.MAX_VALUE))
val writer = ArrowFileWriter(schemaRoot,
/*DictionaryProvider.MapDictionaryProvider()*/ dictionaryProvider,
writeStream.channel)
writer.start()
schemaRoot.rowCount = recordsPerBatch
for (batch in 0 until numBatches){
val numItemsBatch = min(recordsPerBatch, numRecords - batch *
recordsPerBatch)
schemaRoot.rowCount = numItemsBatch
schemaRoot.schema.fields.forEach {
when(it.type){
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) -> {
val vector = schemaRoot.getVector(it.name) as Float8Vector
vector.setInitialCapacity(recordsPerBatch)
vector.allocateNew(recordsPerBatch)
for (i in 0 until numItemsBatch){
vector.set(i, i.toDouble())
}
vector.valueCount = numItemsBatch
}
ArrowType.Int(32, true) -> {
val vector = schemaRoot.getVector(it.name) as IntVector
vector.setInitialCapacity(recordsPerBatch)
vector.allocateNew(recordsPerBatch)
for (i in 0 until numItemsBatch){
vector.set(i, i.toInt())
}
vector.valueCount = numItemsBatch
}
ArrowType.Utf8() -> {
val vec = schemaRoot.getVector(it.name) as VarCharVector
vec.setInitialCapacity(recordsPerBatch)
vec.allocateNew()
//val encoded = DictionaryEncoder.encode(vec,
dictionaryProvider.lookup("Tags".hashCode().toLong()))
for (i in 0 until recordsPerBatch) {
when(i % 4){
0 -> { vec.set(i, "a".toByteArray(Charsets.UTF_8)) }
1 -> { vec.set(i, "b".toByteArray(Charsets.UTF_8)) }
2 -> { vec.set(i, "c".toByteArray(Charsets.UTF_8)) }
3 -> { vec.set(i, "d".toByteArray(Charsets.UTF_8)) }
}
}
vec.valueCount = recordsPerBatch
}
}
}
writer.writeBatch()
}
writer.end()
writer.close()
// Need to close dictionary vectors
for (id in dictionaryProvider.dictionaryIds) {
dictionaryProvider.lookup(id).vector.close()
}
//
*****************************************************************************************************************
// Read from Arrow
*
//
*****************************************************************************************************************
//Accum results
var accumDouble = 0.0
var accumInt = 0L
// Setting reading
val fileToRead = File(filePath)
val readStream = fileToRead.inputStream()
val reader = ArrowFileReader(readStream.channel,
RootAllocator(Long.MAX_VALUE))
//println("Reading the arrow file : $filePath")
val readRoot = reader.vectorSchemaRoot
//val readSchema = readRoot.schema
val arrowBlocks = reader.recordBlocks
reader.recordBlocks.forEachIndexed { index, arrowBlock ->
reader.loadRecordBatch(arrowBlock) // ERROR HERE
println("Reading Block[$index]: ${readRoot.rowCount} elements.")
readRoot.fieldVectors.forEachIndexed { index2, fieldVector ->
val minorType = fieldVector.minorType
when(minorType){
Types.MinorType.FLOAT8 -> {
val vecDouble = fieldVector as Float8Vector
val cap = vecDouble.valueCapacity
var address = vecDouble.dataBufferAddress
for (i in 0 until vecDouble.valueCapacity){
accumDouble += vecDouble.get(i)
}
}
Types.MinorType.INT -> {
val vecDouble = fieldVector as IntVector
val cap = vecDouble.valueCapacity
var address = vecDouble.dataBufferAddress
for (i in 0 until vecDouble.valueCapacity){
accumInt += vecDouble.get(i)
}
}
Types.MinorType.VARCHAR -> {
val vec = fieldVector as VarCharVector
val cap = vec.valueCapacity
var address = vec.dataBufferAddress
for (i in 0 until vec.valueCapacity){
println("Reading tag[$i]
${vec.get(i).toString(Charsets.UTF_8)}")
}
}
}
}
}
reader.close()
println("Double accum: $accumDouble")
println("Long accum: $accumInt")
}
/**
* This is a helper function to create VarCharVector for dictionary purposes.
*/
fun newVarCharVector(name: String, allocator: BufferAllocator): VarCharVector {
return FieldType.nullable(ArrowType.Utf8()).createNewSingleVector(name,
allocator, null) as VarCharVector