This is an automated email from the ASF dual-hosted git repository. emkornfield pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 7cc9020 ARROW-5861: [Java] Initial implement to convert Avro record with primitive types 7cc9020 is described below commit 7cc902036b88811f8570051e5aa811532ecd2efd Author: tianchen <niki...@alibaba-inc.com> AuthorDate: Thu Jul 18 20:11:54 2019 -0700 ARROW-5861: [Java] Initial implement to convert Avro record with primitive types Related to [ARROW-5861](https://issues.apache.org/jira/browse/ARROW-5861). Initial implement to support convert Avro record with primitive types to Arrow objects. Author: tianchen <niki...@alibaba-inc.com> Closes #4812 from tianchen92/ARROW-5861 and squashes the following commits: 24394780f <tianchen> use UnsupportedOperationException fa3f39afb <tianchen> resolve comments 7c3a73054 <tianchen> add consumers and use GenericDatumReader 61d2dac9d <tianchen> fix style 54479c8c0 <tianchen> Initial implement to convert Avro record with primitive types --- java/adapter/avro/pom.xml | 14 ++ .../main/java/org/apache/arrow/AvroToArrow.java | 49 +++++ .../java/org/apache/arrow/AvroToArrowUtils.java | 210 +++++++++++++++++++++ .../arrow/consumers/AvroBooleanConsumer.java} | 34 +++- .../apache/arrow/consumers/AvroBytesConsumer.java | 56 ++++++ .../arrow/consumers/AvroDoubleConsumer.java} | 34 +++- .../apache/arrow/consumers/AvroFloatConsumer.java} | 34 +++- .../apache/arrow/consumers/AvroIntConsumer.java} | 34 +++- .../apache/arrow/consumers/AvroLongConsumer.java} | 34 +++- .../apache/arrow/consumers/AvroStringConsumer.java | 56 ++++++ .../java/org/apache/arrow/consumers/Consumer.java} | 20 +- .../java/org/apache/arrow/AvroToArrowTest.java | 178 +++++++++++++++++ .../avro/src/test/resources/schema/test.avsc | 3 +- .../{test.avsc => test_primitive_boolean.avsc} | 12 +- .../{test.avsc => test_primitive_bytes.avsc} | 12 +- .../{test.avsc => test_primitive_double.avsc} | 12 +- .../{test.avsc => test_primitive_float.avsc} | 12 +- .../schema/{test.avsc => test_primitive_int.avsc} | 12 +- .../schema/{test.avsc => test_primitive_long.avsc} | 12 +- .../{test.avsc => test_primitive_string.avsc} | 12 +- 20 files changed, 735 insertions(+), 105 deletions(-) diff --git a/java/adapter/avro/pom.xml b/java/adapter/avro/pom.xml index 4e13d57..8ef7548 100644 --- a/java/adapter/avro/pom.xml +++ b/java/adapter/avro/pom.xml @@ -28,6 +28,20 @@ <dependencies> + <!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-memory --> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-vector --> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-vector</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java new file mode 100644 index 0000000..4801d69 --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow; + +import java.io.IOException; + +import org.apache.arrow.memory.BaseAllocator; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.Decoder; + +/** + * Utility class to convert Avro objects to columnar Arrow format objects. + */ +public class AvroToArrow { + + /** + * Fetch the data from {@link GenericDatumReader} and convert it to Arrow objects. + * @param schema avro schema. + * @param allocator Memory allocator to use. + * @return Arrow Data Objects {@link VectorSchemaRoot} + */ + public static VectorSchemaRoot avroToArrow(Schema schema, Decoder decoder, BaseAllocator allocator) + throws IOException { + Preconditions.checkNotNull(schema, "Avro schema object can not be null"); + + VectorSchemaRoot root = VectorSchemaRoot.create( + AvroToArrowUtils.avroToArrowSchema(schema), allocator); + AvroToArrowUtils.avroToArrowVectors(decoder, root); + return root; + } +} diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java new file mode 100644 index 0000000..c142689 --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow; + +import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE; +import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.arrow.consumers.AvroBooleanConsumer; +import org.apache.arrow.consumers.AvroBytesConsumer; +import org.apache.arrow.consumers.AvroDoubleConsumer; +import org.apache.arrow.consumers.AvroFloatConsumer; +import org.apache.arrow.consumers.AvroIntConsumer; +import org.apache.arrow.consumers.AvroLongConsumer; +import org.apache.arrow.consumers.AvroStringConsumer; +import org.apache.arrow.consumers.Consumer; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.BaseFixedWidthVector; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; +import org.apache.avro.io.Decoder; + +/** + * Class that does most of the work to convert Avro data into Arrow columnar format Vector objects. + */ +public class AvroToArrowUtils { + + private static final int DEFAULT_BUFFER_SIZE = 256; + + /** + * Creates an {@link org.apache.arrow.vector.types.pojo.ArrowType} from the {@link Schema.Field} + * + <p>This method currently performs following type mapping for Avro data types to corresponding Arrow data types. + * + * <ul> + * <li>STRING --> ArrowType.Utf8</li> + * <li>INT --> ArrowType.Int(32, signed)</li> + * <li>LONG --> ArrowType.Int(64, signed)</li> + * <li>FLOAT --> ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)</li> + * <li>DOUBLE --> ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)</li> + * <li>BOOLEAN --> ArrowType.Bool</li> + * <li>BYTES --> ArrowType.Binary</li> + * </ul> + */ + private static ArrowType getArrowType(Type type) { + + Preconditions.checkNotNull(type, "Avro type object can't be null"); + + switch (type) { + case STRING: + return new ArrowType.Utf8(); + case INT: + return new ArrowType.Int(32, /*signed=*/true); + case BOOLEAN: + return new ArrowType.Bool(); + case LONG: + return new ArrowType.Int(64, /*signed=*/true); + case FLOAT: + return new ArrowType.FloatingPoint(SINGLE); + case DOUBLE: + return new ArrowType.FloatingPoint(DOUBLE); + case BYTES: + return new ArrowType.Binary(); + default: + // no-op, shouldn't get here + throw new RuntimeException("Can't convert avro type %s to arrow type." + type.getName()); + } + } + + /** + * Create Arrow {@link org.apache.arrow.vector.types.pojo.Schema} object for the given Avro {@link Schema}. + */ + public static org.apache.arrow.vector.types.pojo.Schema avroToArrowSchema(Schema schema) { + + Preconditions.checkNotNull(schema, "Avro Schema object can't be null"); + List<Field> arrowFields = new ArrayList<>(); + + Schema.Type type = schema.getType(); + final Map<String, String> metadata = new HashMap<>(); + schema.getObjectProps().forEach((k,v) -> metadata.put(k, v.toString())); + + if (type == Type.RECORD) { + throw new UnsupportedOperationException(); + } else if (type == Type.MAP) { + throw new UnsupportedOperationException(); + } else if (type == Type.UNION) { + throw new UnsupportedOperationException(); + } else if (type == Type.ARRAY) { + throw new UnsupportedOperationException(); + } else if (type == Type.ENUM) { + throw new UnsupportedOperationException(); + } else if (type == Type.NULL) { + throw new UnsupportedOperationException(); + } else { + final FieldType fieldType = new FieldType(true, getArrowType(type), null, null); + arrowFields.add(new Field("", fieldType, null)); + } + + return new org.apache.arrow.vector.types.pojo.Schema(arrowFields, /*metadata=*/ metadata); + } + + /** + * Create consumers to consume avro values from decoder, will reduce boxing/unboxing operations. + */ + public static Consumer[] createAvroConsumers(VectorSchemaRoot root) { + + Consumer[] consumers = new Consumer[root.getFieldVectors().size()]; + for (int i = 0; i < root.getFieldVectors().size(); i++) { + FieldVector vector = root.getFieldVectors().get(i); + Consumer consumer; + switch (vector.getMinorType()) { + case INT: + consumer = new AvroIntConsumer((IntVector) vector); + break; + case VARBINARY: + consumer = new AvroBytesConsumer((VarBinaryVector) vector); + break; + case VARCHAR: + consumer = new AvroStringConsumer((VarCharVector) vector); + break; + case BIGINT: + consumer = new AvroLongConsumer((BigIntVector) vector); + break; + case FLOAT4: + consumer = new AvroFloatConsumer((Float4Vector) vector); + break; + case FLOAT8: + consumer = new AvroDoubleConsumer((Float8Vector) vector); + break; + case BIT: + consumer = new AvroBooleanConsumer((BitVector) vector); + break; + default: + throw new RuntimeException("could not get consumer from type:" + vector.getMinorType()); + } + consumers[i] = consumer; + } + return consumers; + } + + /** + * Iterate the given Avro {@link Decoder} object to fetch the data and transpose it to populate + * the given Arrow Vector objects. + * @param decoder avro decoder to read data. + * @param root Arrow {@link VectorSchemaRoot} object to populate + */ + public static void avroToArrowVectors(Decoder decoder, VectorSchemaRoot root) throws IOException { + + Preconditions.checkNotNull(decoder, "Avro decoder object can't be null"); + Preconditions.checkNotNull(root, "VectorSchemaRoot object can't be null"); + + allocateVectors(root, DEFAULT_BUFFER_SIZE); + Consumer[] consumers = createAvroConsumers(root); + while (true) { + try { + for (Consumer consumer : consumers) { + consumer.consume(decoder); + } + //reach end will throw EOFException. + } catch (EOFException eofException) { + break; + } + } + } + + private static void allocateVectors(VectorSchemaRoot root, int size) { + List<FieldVector> vectors = root.getFieldVectors(); + for (FieldVector fieldVector : vectors) { + if (fieldVector instanceof BaseFixedWidthVector) { + ((BaseFixedWidthVector) fieldVector).allocateNew(size); + } else { + fieldVector.allocateNew(); + } + fieldVector.setInitialCapacity(size); + } + } +} diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java similarity index 52% copy from java/adapter/avro/src/test/resources/schema/test.avsc copy to java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java index 15fdd76..7bbfac1 100644 --- a/java/adapter/avro/src/test/resources/schema/test.avsc +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java @@ -15,12 +15,30 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]} - ] +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.complex.impl.BitWriterImpl; +import org.apache.arrow.vector.complex.writer.BitWriter; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume boolean type values from avro decoder. + * Write the data to {@link BitVector}. + */ +public class AvroBooleanConsumer implements Consumer { + + private final BitWriter writer; + + public AvroBooleanConsumer(BitVector vector) { + this.writer = new BitWriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + writer.writeBit(decoder.readBoolean() ? 1 : 0); + writer.setPosition(writer.getPosition() + 1); + } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java new file mode 100644 index 0000000..9c3eff7 --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.complex.impl.VarBinaryWriterImpl; +import org.apache.arrow.vector.complex.writer.VarBinaryWriter; +import org.apache.arrow.vector.holders.VarBinaryHolder; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume bytes type values from avro decoder. + * Write the data to {@link VarBinaryVector}. + */ +public class AvroBytesConsumer implements Consumer { + + private final VarBinaryWriter writer; + private final VarBinaryVector vector; + + public AvroBytesConsumer(VarBinaryVector vector) { + this.vector = vector; + this.writer = new VarBinaryWriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + VarBinaryHolder holder = new VarBinaryHolder(); + ByteBuffer byteBuffer = decoder.readBytes(null); + + holder.start = 0; + holder.end = byteBuffer.capacity(); + holder.buffer = vector.getAllocator().buffer(byteBuffer.capacity()); + holder.buffer.setBytes(0, byteBuffer); + + writer.write(holder); + writer.setPosition(writer.getPosition() + 1); + } +} diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java similarity index 51% copy from java/adapter/avro/src/test/resources/schema/test.avsc copy to java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java index 15fdd76..62dc315 100644 --- a/java/adapter/avro/src/test/resources/schema/test.avsc +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java @@ -15,12 +15,30 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]} - ] +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.complex.impl.Float8WriterImpl; +import org.apache.arrow.vector.complex.writer.Float8Writer; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume double type values from avro decoder. + * Write the data to {@link Float8Vector}. + */ +public class AvroDoubleConsumer implements Consumer { + + private final Float8Writer writer; + + public AvroDoubleConsumer(Float8Vector vector) { + this.writer = new Float8WriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + writer.writeFloat8(decoder.readDouble()); + writer.setPosition(writer.getPosition() + 1); + } } diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java similarity index 51% copy from java/adapter/avro/src/test/resources/schema/test.avsc copy to java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java index 15fdd76..2bec2b2 100644 --- a/java/adapter/avro/src/test/resources/schema/test.avsc +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java @@ -15,12 +15,30 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]} - ] +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.complex.impl.Float4WriterImpl; +import org.apache.arrow.vector.complex.writer.Float4Writer; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume float type values from avro decoder. + * Write the data to {@link Float4Vector}. + */ +public class AvroFloatConsumer implements Consumer { + + private final Float4Writer writer; + + public AvroFloatConsumer(Float4Vector vector) { + this.writer = new Float4WriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + writer.writeFloat4(decoder.readFloat()); + writer.setPosition(writer.getPosition() + 1); + } } diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java similarity index 52% copy from java/adapter/avro/src/test/resources/schema/test.avsc copy to java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java index 15fdd76..60285f0 100644 --- a/java/adapter/avro/src/test/resources/schema/test.avsc +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java @@ -15,12 +15,30 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]} - ] +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.complex.impl.IntWriterImpl; +import org.apache.arrow.vector.complex.writer.IntWriter; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume int type values from avro decoder. + * Write the data to {@link IntVector}. + */ +public class AvroIntConsumer implements Consumer { + + private final IntWriter writer; + + public AvroIntConsumer(IntVector vector) { + this.writer = new IntWriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + writer.writeInt(decoder.readInt()); + writer.setPosition(writer.getPosition() + 1); + } } diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java similarity index 51% copy from java/adapter/avro/src/test/resources/schema/test.avsc copy to java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java index 15fdd76..15756af 100644 --- a/java/adapter/avro/src/test/resources/schema/test.avsc +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java @@ -15,12 +15,30 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]} - ] +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.complex.impl.BigIntWriterImpl; +import org.apache.arrow.vector.complex.writer.BigIntWriter; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume long type values from avro decoder. + * Write the data to {@link BigIntVector}. + */ +public class AvroLongConsumer implements Consumer { + + private final BigIntWriter writer; + + public AvroLongConsumer(BigIntVector vector) { + this.writer = new BigIntWriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + writer.writeBigInt(decoder.readLong()); + writer.setPosition(writer.getPosition() + 1); + } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java new file mode 100644 index 0000000..db438f9 --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.complex.impl.VarCharWriterImpl; +import org.apache.arrow.vector.complex.writer.VarCharWriter; +import org.apache.arrow.vector.holders.VarCharHolder; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume string type values from avro decoder. + * Write the data to {@link VarCharVector}. + */ +public class AvroStringConsumer implements Consumer { + + private final VarCharVector vector; + private final VarCharWriter writer; + + public AvroStringConsumer(VarCharVector vector) { + this.vector = vector; + this.writer = new VarCharWriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + VarCharHolder holder = new VarCharHolder(); + ByteBuffer byteBuffer = decoder.readBytes(null); + + holder.start = 0; + holder.end = byteBuffer.capacity(); + holder.buffer = vector.getAllocator().buffer(byteBuffer.capacity()); + holder.buffer.setBytes(0, byteBuffer); + + writer.write(holder); + writer.setPosition(writer.getPosition() + 1); + } +} diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java similarity index 75% copy from java/adapter/avro/src/test/resources/schema/test.avsc copy to java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java index 15fdd76..b3c5281 100644 --- a/java/adapter/avro/src/test/resources/schema/test.avsc +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java @@ -15,12 +15,16 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]} - ] +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.avro.io.Decoder; + +/** + * An abstraction that is used to consume values from avro decoder. + */ +public interface Consumer { + + void consume(Decoder decoder) throws IOException; } diff --git a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java new file mode 100644 index 0000000..d880639 --- /dev/null +++ b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import org.apache.arrow.memory.BaseAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class AvroToArrowTest { + + @ClassRule + public static final TemporaryFolder TMP = new TemporaryFolder(); + + private BaseAllocator allocator; + + @Before + public void init() { + allocator = new RootAllocator(Long.MAX_VALUE); + } + + private Schema getSchema(String schemaName) throws Exception { + Path schemaPath = Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(), + "schema", schemaName); + return new Schema.Parser().parse(schemaPath.toFile()); + } + + private VectorSchemaRoot writeAndReadPrimitive(Schema schema, List data) throws Exception { + File dataFile = TMP.newFile(); + + BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(new FileOutputStream(dataFile), null); + DatumWriter writer = new GenericDatumWriter(schema); + BinaryDecoder decoder = new DecoderFactory().directBinaryDecoder(new FileInputStream(dataFile), null); + + for (Object value : data) { + writer.write(value, encoder); + } + + return AvroToArrow.avroToArrow(schema, decoder, allocator); + } + + @Test + public void testStringType() throws Exception { + Schema schema = getSchema("test_primitive_string.avsc"); + ArrayList<String> data = new ArrayList(Arrays.asList("v1", "v2", "v3", "v4", "v5")); + + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(schema, data, vector); + } + + @Test + public void testIntType() throws Exception { + Schema schema = getSchema("test_primitive_int.avsc"); + ArrayList<Integer> data = new ArrayList(Arrays.asList(1, 2, 3, 4, 5)); + + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(schema, data, vector); + } + + @Test + public void testLongType() throws Exception { + Schema schema = getSchema("test_primitive_long.avsc"); + ArrayList<Long> data = new ArrayList(Arrays.asList(1L, 2L, 3L, 4L, 5L)); + + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(schema, data, vector); + } + + @Test + public void testFloatType() throws Exception { + Schema schema = getSchema("test_primitive_float.avsc"); + ArrayList<Float> data = new ArrayList(Arrays.asList(1.1f, 2.2f, 3.3f, 4.4f, 5.5f)); + + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(schema, data, vector); + } + + @Test + public void testDoubleType() throws Exception { + Schema schema = getSchema("test_primitive_double.avsc"); + ArrayList<Double> data = new ArrayList(Arrays.asList(1.1, 2.2, 3.3, 4.4, 5.5)); + + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(schema, data, vector); + } + + @Test + public void testBytesType() throws Exception { + Schema schema = getSchema("test_primitive_bytes.avsc"); + ArrayList<ByteBuffer> data = new ArrayList(Arrays.asList( + ByteBuffer.wrap("value1".getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap("value2".getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap("value3".getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap("value4".getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap("value5".getBytes(StandardCharsets.UTF_8)))); + + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(schema, data, vector); + } + + @Test + public void testBooleanType() throws Exception { + Schema schema = getSchema("test_primitive_boolean.avsc"); + ArrayList<Boolean> data = new ArrayList(Arrays.asList(true, false, true, false, true)); + + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(schema, data, vector); + } + + private void checkPrimitiveResult(Schema schema, ArrayList data, FieldVector vector) { + assertEquals(data.size(), vector.getValueCount()); + for (int i = 0; i < data.size(); i++) { + Object value1 = data.get(i); + Object value2 = vector.getObject(i); + if (schema.getType() == Schema.Type.BYTES) { + value2 = ByteBuffer.wrap((byte[]) value2); + } else if (schema.getType() == Schema.Type.STRING) { + value2 = value2.toString(); + } + assertTrue(Objects.equals(value1, value2)); + } + } +} diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc b/java/adapter/avro/src/test/resources/schema/test.avsc index 15fdd76..92c0873 100644 --- a/java/adapter/avro/src/test/resources/schema/test.avsc +++ b/java/adapter/avro/src/test/resources/schema/test.avsc @@ -15,7 +15,8 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", +{ + "namespace": "org.apache.arrow.avro", "type": "record", "name": "User", "fields": [ diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_boolean.avsc similarity index 76% copy from java/adapter/avro/src/test/resources/schema/test.avsc copy to java/adapter/avro/src/test/resources/schema/test_primitive_boolean.avsc index 15fdd76..7652ce7 100644 --- a/java/adapter/avro/src/test/resources/schema/test.avsc +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_boolean.avsc @@ -15,12 +15,8 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]} - ] +{ + "namespace": "org.apache.arrow.avro", + "type": "boolean", + "name": "TestBoolean" } diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_bytes.avsc similarity index 76% copy from java/adapter/avro/src/test/resources/schema/test.avsc copy to java/adapter/avro/src/test/resources/schema/test_primitive_bytes.avsc index 15fdd76..5102430 100644 --- a/java/adapter/avro/src/test/resources/schema/test.avsc +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_bytes.avsc @@ -15,12 +15,8 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]} - ] +{ + "namespace": "org.apache.arrow.avro", + "type": "bytes", + "name": "TestBytes" } diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_double.avsc similarity index 76% copy from java/adapter/avro/src/test/resources/schema/test.avsc copy to java/adapter/avro/src/test/resources/schema/test_primitive_double.avsc index 15fdd76..d1ae0b6 100644 --- a/java/adapter/avro/src/test/resources/schema/test.avsc +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_double.avsc @@ -15,12 +15,8 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]} - ] +{ + "namespace": "org.apache.arrow.avro", + "type": "double", + "name": "TestDouble" } diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_float.avsc similarity index 76% copy from java/adapter/avro/src/test/resources/schema/test.avsc copy to java/adapter/avro/src/test/resources/schema/test_primitive_float.avsc index 15fdd76..675d109 100644 --- a/java/adapter/avro/src/test/resources/schema/test.avsc +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_float.avsc @@ -15,12 +15,8 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]} - ] +{ + "namespace": "org.apache.arrow.avro", + "type": "float", + "name": "TestFloat" } diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_int.avsc similarity index 76% copy from java/adapter/avro/src/test/resources/schema/test.avsc copy to java/adapter/avro/src/test/resources/schema/test_primitive_int.avsc index 15fdd76..8fc8488 100644 --- a/java/adapter/avro/src/test/resources/schema/test.avsc +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_int.avsc @@ -15,12 +15,8 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]} - ] +{ + "namespace": "org.apache.arrow.avro", + "type": "int", + "name": "TestInt" } diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_long.avsc similarity index 76% copy from java/adapter/avro/src/test/resources/schema/test.avsc copy to java/adapter/avro/src/test/resources/schema/test_primitive_long.avsc index 15fdd76..b970610 100644 --- a/java/adapter/avro/src/test/resources/schema/test.avsc +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_long.avsc @@ -15,12 +15,8 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]} - ] +{ + "namespace": "org.apache.arrow.avro", + "type": "long", + "name": "TestLong" } diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_string.avsc similarity index 76% copy from java/adapter/avro/src/test/resources/schema/test.avsc copy to java/adapter/avro/src/test/resources/schema/test_primitive_string.avsc index 15fdd76..b4a89a7 100644 --- a/java/adapter/avro/src/test/resources/schema/test.avsc +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_string.avsc @@ -15,12 +15,8 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]} - ] +{ + "namespace": "org.apache.arrow.avro", + "type": "string", + "name": "TestString" }