Hi there,

I´m a newbie arrow user and I want to develop an application which loads/stores 
information from/to arrow. For this purpose, I started a very simple Kotlin 
application that first writes an arrow file and later reads the information
from that arrow file. It makes use of the Arrow Java API (See code below).
This application sets an schema with 3 fields: 1 Float, 1 Int and 1 Varchar. 
Later, it writes 3 batches of ten elements (in Section Writer). In the last 
section (called Reader), It tries to load the information that was stored in 
the Arrow file.

The application works properly if I avoid the usage of dictionaries for storing 
and loading Strings in Utf-8 format. However, If I use dictionaries I get an 
exception (see below) while trying to load the first batch by using the 
ArrowFileReader.loadRecordsBatch() function (Line annoted with //ERROR HERE 
tag.).

Exception in thread "main" java.lang.IllegalArgumentException: not all nodes 
and buffers were consumed. nodes: [] buffers: [ArrowBuf[52], udle: [34 
448..458]]
                at 
org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:65)
                at 
org.apache.arrow.vector.ipc.ArrowReader.loadRecordBatch(ArrowReader.java:220)
                at 
org.apache.arrow.vector.ipc.ArrowFileReader.loadNextBatch(ArrowFileReader.java:114)
                at 
org.apache.arrow.vector.ipc.ArrowFileReader.loadRecordBatch(ArrowFileReader.java:139)
                at 
ArrowFileSymbolicalWithDictionaryKt.main(ArrowFileSymbolicalWithDictionary.kt:157)

I´m not sure if I´m doing something wrong or it is just an issue, could anyone 
assist me with this issue?
Thanks in advance,
Antonio Vilches.




fun main(args: Array<String>) {

    println("Writing and reading an apache arrow file")
    val numRecords = (10 * 3) // 30 in total
    val initialBatchSize = 10


    val recordsPerBatch = initialBatchSize

    val numBatches  = if ((numRecords % recordsPerBatch) == 0){
        numRecords / recordsPerBatch
    }else{
        numRecords / recordsPerBatch + 1
    }

    println("Reading and writing $numRecords records in $numBatches batches.")
    // 
*****************************************************************************************************************
    // Generate dictionary                                                      
                                       *
    // 
*****************************************************************************************************************
    val dictionaryProvider = DictionaryProvider.MapDictionaryProvider()
    val vector = newVarCharVector("Tags", RootAllocator(Long.MAX_VALUE))
    vector.setInitialCapacity(4)
    vector.allocateNew()

    vector.set(0, "a".toByteArray(Charsets.UTF_8))
    vector.set(1, "b".toByteArray(Charsets.UTF_8))
    vector.set(2, "c".toByteArray(Charsets.UTF_8))
    vector.set(3, "d".toByteArray(Charsets.UTF_8))

    vector.valueCount = 4
    val dictionary = Dictionary(vector, 
DictionaryEncoding("Tags".hashCode().toLong(), false, null))
    dictionaryProvider.put(dictionary)

    // 
*****************************************************************************************************************
    // Create schema                                                            
                                       *
    // 
*****************************************************************************************************************
    val filePath = "./example.arrow"

    //Create Fields
    val temperature = Field("temperature", 
FieldType.nullable(ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), 
null)
    val humidity = Field("humidity", FieldType.nullable(ArrowType.Int(32, 
true)), null)
    val dic = dictionaryProvider.lookup("Tags".hashCode().toLong())
    val tag = Field("tag", FieldType(true, ArrowType.Utf8(), /*null*/ 
dic.encoding, null), null)

    //Create schema
    val builder = mutableListOf<Field>()
    builder.add(temperature)
    builder.add(humidity)
    builder.add(tag)
    val schema = Schema(builder, null)

    // 
*****************************************************************************************************************
    // Write to Arrow                                                           
                                       *
    // 
*****************************************************************************************************************

    val fileToWrite = File(filePath)
    val writeStream = fileToWrite.outputStream()
    val schemaRoot = VectorSchemaRoot.create(schema, 
RootAllocator(Long.MAX_VALUE))
    val writer = ArrowFileWriter(schemaRoot, 
/*DictionaryProvider.MapDictionaryProvider()*/ dictionaryProvider, 
writeStream.channel)

    writer.start()

    schemaRoot.rowCount = recordsPerBatch
    for (batch in 0 until numBatches){
        val numItemsBatch = min(recordsPerBatch, numRecords - batch * 
recordsPerBatch)
        schemaRoot.rowCount = numItemsBatch
        schemaRoot.schema.fields.forEach {
            when(it.type){
                ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) -> {
                    val vector = schemaRoot.getVector(it.name) as Float8Vector
                    vector.setInitialCapacity(recordsPerBatch)
                    vector.allocateNew(recordsPerBatch)
                    for (i in 0 until numItemsBatch){
                        vector.set(i, i.toDouble())
                    }
                    vector.valueCount = numItemsBatch
                }
                ArrowType.Int(32, true) -> {
                    val vector = schemaRoot.getVector(it.name) as IntVector
                    vector.setInitialCapacity(recordsPerBatch)
                    vector.allocateNew(recordsPerBatch)
                    for (i in 0 until numItemsBatch){
                        vector.set(i, i.toInt())
                    }
                    vector.valueCount = numItemsBatch
                }
                ArrowType.Utf8() -> {
                    val vec = schemaRoot.getVector(it.name) as VarCharVector
                    vec.setInitialCapacity(recordsPerBatch)
                    vec.allocateNew()

                    //val encoded = DictionaryEncoder.encode(vec, 
dictionaryProvider.lookup("Tags".hashCode().toLong()))

                    for (i in 0 until recordsPerBatch) {
                        when(i % 4){
                            0 -> { vec.set(i, "a".toByteArray(Charsets.UTF_8)) }
                            1 -> { vec.set(i, "b".toByteArray(Charsets.UTF_8)) }
                            2 -> { vec.set(i, "c".toByteArray(Charsets.UTF_8)) }
                            3 -> { vec.set(i, "d".toByteArray(Charsets.UTF_8)) }
                        }
                    }
                    vec.valueCount = recordsPerBatch
                }
            }
        }
        writer.writeBatch()
    }
    writer.end()
    writer.close()

    // Need to close dictionary vectors
    for (id in dictionaryProvider.dictionaryIds) {
        dictionaryProvider.lookup(id).vector.close()
    }

    // 
*****************************************************************************************************************
    // Read from Arrow                                                          
                                       *
    // 
*****************************************************************************************************************

    //Accum results
    var accumDouble = 0.0
    var accumInt = 0L

    // Setting reading
    val fileToRead = File(filePath)
    val readStream = fileToRead.inputStream()
    val reader = ArrowFileReader(readStream.channel, 
RootAllocator(Long.MAX_VALUE))

    //println("Reading the arrow file : $filePath")
    val readRoot = reader.vectorSchemaRoot
    //val readSchema = readRoot.schema



    val arrowBlocks = reader.recordBlocks

    reader.recordBlocks.forEachIndexed { index, arrowBlock ->
        reader.loadRecordBatch(arrowBlock)  // ERROR HERE

        println("Reading Block[$index]: ${readRoot.rowCount} elements.")

        readRoot.fieldVectors.forEachIndexed { index2, fieldVector ->
            val minorType = fieldVector.minorType
            when(minorType){
                Types.MinorType.FLOAT8 -> {
                    val vecDouble = fieldVector as Float8Vector
                    val cap = vecDouble.valueCapacity
                    var address = vecDouble.dataBufferAddress
                    for (i in 0 until vecDouble.valueCapacity){
                        accumDouble += vecDouble.get(i)
                    }
                }
                Types.MinorType.INT -> {
                    val vecDouble = fieldVector as IntVector
                    val cap = vecDouble.valueCapacity
                    var address = vecDouble.dataBufferAddress
                    for (i in 0 until vecDouble.valueCapacity){
                        accumInt += vecDouble.get(i)
                    }
                }
                Types.MinorType.VARCHAR -> {
                    val vec = fieldVector as VarCharVector
                    val cap = vec.valueCapacity
                    var address = vec.dataBufferAddress
                    for (i in 0 until vec.valueCapacity){
                        println("Reading tag[$i] 
${vec.get(i).toString(Charsets.UTF_8)}")
                    }
                }
            }
        }
    }

    reader.close()

    println("Double accum: $accumDouble")
    println("Long accum: $accumInt")

}

/**
 * This is a helper function to create VarCharVector for dictionary purposes.
 */
fun newVarCharVector(name: String, allocator: BufferAllocator): VarCharVector {
    return FieldType.nullable(ArrowType.Utf8()).createNewSingleVector(name, 
allocator, null) as VarCharVector

Reply via email to