Hi Antonio,
My memory is a little rusty now but if I remember correctly, when writing
to a dictionary encoded vector, the value should be of encoded value
instead of decoded, in your code:
//
*****************************************************************************************************************
// Write to Arrow
*
//
*****************************************************************************************************************
ArrowType.Utf8() -> {
val vec = schemaRoot.getVector(it.name) as VarCharVector
vec.setInitialCapacity(recordsPerBatch)
vec.allocateNew()
//val encoded = DictionaryEncoder.encode(vec,
dictionaryProvider.lookup("Tags".hashCode().toLong()))
for (i in 0 until recordsPerBatch) {
when(i % 4){
0 -> { vec.set(i,
"a".toByteArray(Charsets.UTF_8)) }
1 -> { vec.set(i,
"b".toByteArray(Charsets.UTF_8)) }
2 -> { vec.set(i,
"c".toByteArray(Charsets.UTF_8)) }
3 -> { vec.set(i,
"d".toByteArray(Charsets.UTF_8)) }
}
}
vec.valueCount = recordsPerBatch
}
}
Is writing the decoded value (UTF-8) however it should be writing the
encoded value.
However, is there an issue that:
VectorSchemaRoot.create(schema, RootAllocator(Long.MAX_VALUE))
will create an decoded type vector instead of encoded, so you would
probably need to create vectors yourself instead of using
VectorSchemaRoot.create(schema, RootAllocator(Long.MAX_VALUE)).
For more details please see:
https://github.com/apache/arrow/pull/2681
On Mon, Feb 4, 2019 at 2:07 PM Wes McKinney <[email protected]> wrote:
> not sure if you're subscribed to user@ but in case you have advice
>
> ---------- Forwarded message ---------
> From: Antonio Vilches <[email protected]>
> Date: Mon, Feb 4, 2019 at 8:41 AM
> Subject: Issue while loading an arrow file with dictionaries
> To: [email protected] <[email protected]>
>
>
> Hi there,
>
>
>
> I´m a newbie arrow user and I want to develop an application which
> loads/stores information from/to arrow. For this purpose, I started a
> very simple Kotlin application that first writes an arrow file and
> later reads the information
>
> from that arrow file. It makes use of the Arrow Java API (See code below).
>
> This application sets an schema with 3 fields: 1 Float, 1 Int and 1
> Varchar. Later, it writes 3 batches of ten elements (in Section
> Writer). In the last section (called Reader), It tries to load the
> information that was stored in the Arrow file.
>
>
>
> The application works properly if I avoid the usage of dictionaries
> for storing and loading Strings in Utf-8 format. However, If I use
> dictionaries I get an exception (see below) while trying to load the
> first batch by using the ArrowFileReader.loadRecordsBatch() function
> (Line annoted with //ERROR HERE tag.).
>
>
>
> Exception in thread "main" java.lang.IllegalArgumentException: not all
> nodes and buffers were consumed. nodes: [] buffers: [ArrowBuf[52],
> udle: [34 448..458]]
>
> at
> org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:65)
>
> at
>
> org.apache.arrow.vector.ipc.ArrowReader.loadRecordBatch(ArrowReader.java:220)
>
> at
>
> org.apache.arrow.vector.ipc.ArrowFileReader.loadNextBatch(ArrowFileReader.java:114)
>
> at
>
> org.apache.arrow.vector.ipc.ArrowFileReader.loadRecordBatch(ArrowFileReader.java:139)
>
> at
>
> ArrowFileSymbolicalWithDictionaryKt.main(ArrowFileSymbolicalWithDictionary.kt:157)
>
>
>
> I´m not sure if I´m doing something wrong or it is just an issue,
> could anyone assist me with this issue?
>
> Thanks in advance,
>
> Antonio Vilches.
>
>
>
>
>
>
>
> fun main(args: Array<String>) {
>
> println("Writing and reading an apache arrow file")
> val numRecords = (10 * 3) // 30 in total
> val initialBatchSize = 10
>
>
> val recordsPerBatch = initialBatchSize
>
> val numBatches = if ((numRecords % recordsPerBatch) == 0){
> numRecords / recordsPerBatch
> }else{
> numRecords / recordsPerBatch + 1
> }
>
> println("Reading and writing $numRecords records in $numBatches
> batches.")
> //
> *****************************************************************************************************************
> // Generate dictionary
> *
> //
> *****************************************************************************************************************
> val dictionaryProvider = DictionaryProvider.MapDictionaryProvider()
> val vector = newVarCharVector("Tags", RootAllocator(Long.MAX_VALUE))
> vector.setInitialCapacity(4)
> vector.allocateNew()
>
> vector.set(0, "a".toByteArray(Charsets.UTF_8))
> vector.set(1, "b".toByteArray(Charsets.UTF_8))
> vector.set(2, "c".toByteArray(Charsets.UTF_8))
> vector.set(3, "d".toByteArray(Charsets.UTF_8))
>
> vector.valueCount = 4
> val dictionary = Dictionary(vector,
> DictionaryEncoding("Tags".hashCode().toLong(), false, null))
> dictionaryProvider.put(dictionary)
>
> //
> *****************************************************************************************************************
> // Create schema
> *
> //
> *****************************************************************************************************************
> val filePath = "./example.arrow"
>
> //Create Fields
> val temperature = Field("temperature",
> FieldType.nullable(ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)),
> null)
> val humidity = Field("humidity",
> FieldType.nullable(ArrowType.Int(32, true)), null)
> val dic = dictionaryProvider.lookup("Tags".hashCode().toLong())
> val tag = Field("tag", FieldType(true, ArrowType.Utf8(), /*null*/
> dic.encoding, null), null)
>
> //Create schema
> val builder = mutableListOf<Field>()
> builder.add(temperature)
> builder.add(humidity)
> builder.add(tag)
> val schema = Schema(builder, null)
>
> //
> *****************************************************************************************************************
> // Write to Arrow
> *
> //
> *****************************************************************************************************************
>
> val fileToWrite = File(filePath)
> val writeStream = fileToWrite.outputStream()
> val schemaRoot = VectorSchemaRoot.create(schema,
> RootAllocator(Long.MAX_VALUE))
> val writer = ArrowFileWriter(schemaRoot,
> /*DictionaryProvider.MapDictionaryProvider()*/ dictionaryProvider,
> writeStream.channel)
>
> writer.start()
>
> schemaRoot.rowCount = recordsPerBatch
> for (batch in 0 until numBatches){
> val numItemsBatch = min(recordsPerBatch, numRecords - batch *
> recordsPerBatch)
> schemaRoot.rowCount = numItemsBatch
> schemaRoot.schema.fields.forEach {
> when(it.type){
> ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) -> {
> val vector = schemaRoot.getVector(it.name) as
> Float8Vector
> vector.setInitialCapacity(recordsPerBatch)
> vector.allocateNew(recordsPerBatch)
> for (i in 0 until numItemsBatch){
> vector.set(i, i.toDouble())
> }
> vector.valueCount = numItemsBatch
> }
> ArrowType.Int(32, true) -> {
> val vector = schemaRoot.getVector(it.name) as
> IntVector
> vector.setInitialCapacity(recordsPerBatch)
> vector.allocateNew(recordsPerBatch)
> for (i in 0 until numItemsBatch){
> vector.set(i, i.toInt())
> }
> vector.valueCount = numItemsBatch
> }
> ArrowType.Utf8() -> {
> val vec = schemaRoot.getVector(it.name) as
> VarCharVector
> vec.setInitialCapacity(recordsPerBatch)
> vec.allocateNew()
>
> //val encoded = DictionaryEncoder.encode(vec,
> dictionaryProvider.lookup("Tags".hashCode().toLong()))
>
> for (i in 0 until recordsPerBatch) {
> when(i % 4){
> 0 -> { vec.set(i,
> "a".toByteArray(Charsets.UTF_8)) }
> 1 -> { vec.set(i,
> "b".toByteArray(Charsets.UTF_8)) }
> 2 -> { vec.set(i,
> "c".toByteArray(Charsets.UTF_8)) }
> 3 -> { vec.set(i,
> "d".toByteArray(Charsets.UTF_8)) }
> }
> }
> vec.valueCount = recordsPerBatch
> }
> }
> }
> writer.writeBatch()
> }
> writer.end()
> writer.close()
>
> // Need to close dictionary vectors
> for (id in dictionaryProvider.dictionaryIds) {
> dictionaryProvider.lookup(id).vector.close()
> }
>
> //
> *****************************************************************************************************************
> // Read from Arrow
> *
> //
> *****************************************************************************************************************
>
> //Accum results
> var accumDouble = 0.0
> var accumInt = 0L
>
> // Setting reading
> val fileToRead = File(filePath)
> val readStream = fileToRead.inputStream()
> val reader = ArrowFileReader(readStream.channel,
> RootAllocator(Long.MAX_VALUE))
>
> //println("Reading the arrow file : $filePath")
> val readRoot = reader.vectorSchemaRoot
> //val readSchema = readRoot.schema
>
>
>
> val arrowBlocks = reader.recordBlocks
>
> reader.recordBlocks.forEachIndexed { index, arrowBlock ->
> reader.loadRecordBatch(arrowBlock) // ERROR HERE
>
> println("Reading Block[$index]: ${readRoot.rowCount} elements.")
>
> readRoot.fieldVectors.forEachIndexed { index2, fieldVector ->
> val minorType = fieldVector.minorType
> when(minorType){
> Types.MinorType.FLOAT8 -> {
> val vecDouble = fieldVector as Float8Vector
> val cap = vecDouble.valueCapacity
> var address = vecDouble.dataBufferAddress
> for (i in 0 until vecDouble.valueCapacity){
> accumDouble += vecDouble.get(i)
> }
> }
> Types.MinorType.INT -> {
> val vecDouble = fieldVector as IntVector
> val cap = vecDouble.valueCapacity
> var address = vecDouble.dataBufferAddress
> for (i in 0 until vecDouble.valueCapacity){
> accumInt += vecDouble.get(i)
> }
> }
> Types.MinorType.VARCHAR -> {
> val vec = fieldVector as VarCharVector
> val cap = vec.valueCapacity
> var address = vec.dataBufferAddress
> for (i in 0 until vec.valueCapacity){
> println("Reading tag[$i]
> ${vec.get(i).toString(Charsets.UTF_8)}")
> }
> }
> }
> }
> }
>
> reader.close()
>
> println("Double accum: $accumDouble")
> println("Long accum: $accumInt")
>
> }
>
> /**
> * This is a helper function to create VarCharVector for dictionary
> purposes.
> */
> fun newVarCharVector(name: String, allocator: BufferAllocator):
> VarCharVector {
> return FieldType.nullable(ArrowType.Utf8()).createNewSingleVector(name,
> allocator, null) as VarCharVector
>