This is an automated email from the ASF dual-hosted git repository.

emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7cc9020  ARROW-5861: [Java] Initial implement to convert Avro record 
with primitive types
7cc9020 is described below

commit 7cc902036b88811f8570051e5aa811532ecd2efd
Author: tianchen <niki...@alibaba-inc.com>
AuthorDate: Thu Jul 18 20:11:54 2019 -0700

    ARROW-5861: [Java] Initial implement to convert Avro record with primitive 
types
    
    Related to [ARROW-5861](https://issues.apache.org/jira/browse/ARROW-5861).
    Initial implement to support convert Avro record with primitive types to 
Arrow objects.
    
    Author: tianchen <niki...@alibaba-inc.com>
    
    Closes #4812 from tianchen92/ARROW-5861 and squashes the following commits:
    
    24394780f <tianchen> use UnsupportedOperationException
    fa3f39afb <tianchen> resolve comments
    7c3a73054 <tianchen> add consumers and use GenericDatumReader
    61d2dac9d <tianchen> fix style
    54479c8c0 <tianchen> Initial implement to convert Avro record with 
primitive types
---
 java/adapter/avro/pom.xml                          |  14 ++
 .../main/java/org/apache/arrow/AvroToArrow.java    |  49 +++++
 .../java/org/apache/arrow/AvroToArrowUtils.java    | 210 +++++++++++++++++++++
 .../arrow/consumers/AvroBooleanConsumer.java}      |  34 +++-
 .../apache/arrow/consumers/AvroBytesConsumer.java  |  56 ++++++
 .../arrow/consumers/AvroDoubleConsumer.java}       |  34 +++-
 .../apache/arrow/consumers/AvroFloatConsumer.java} |  34 +++-
 .../apache/arrow/consumers/AvroIntConsumer.java}   |  34 +++-
 .../apache/arrow/consumers/AvroLongConsumer.java}  |  34 +++-
 .../apache/arrow/consumers/AvroStringConsumer.java |  56 ++++++
 .../java/org/apache/arrow/consumers/Consumer.java} |  20 +-
 .../java/org/apache/arrow/AvroToArrowTest.java     | 178 +++++++++++++++++
 .../avro/src/test/resources/schema/test.avsc       |   3 +-
 .../{test.avsc => test_primitive_boolean.avsc}     |  12 +-
 .../{test.avsc => test_primitive_bytes.avsc}       |  12 +-
 .../{test.avsc => test_primitive_double.avsc}      |  12 +-
 .../{test.avsc => test_primitive_float.avsc}       |  12 +-
 .../schema/{test.avsc => test_primitive_int.avsc}  |  12 +-
 .../schema/{test.avsc => test_primitive_long.avsc} |  12 +-
 .../{test.avsc => test_primitive_string.avsc}      |  12 +-
 20 files changed, 735 insertions(+), 105 deletions(-)

diff --git a/java/adapter/avro/pom.xml b/java/adapter/avro/pom.xml
index 4e13d57..8ef7548 100644
--- a/java/adapter/avro/pom.xml
+++ b/java/adapter/avro/pom.xml
@@ -28,6 +28,20 @@
 
   <dependencies>
 
+    <!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-memory -->
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-memory</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-vector -->
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-vector</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.avro</groupId>
       <artifactId>avro</artifactId>
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java 
b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java
new file mode 100644
index 0000000..4801d69
--- /dev/null
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow;
+
+import java.io.IOException;
+
+import org.apache.arrow.memory.BaseAllocator;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Utility class to convert Avro objects to columnar Arrow format objects.
+ */
+public class AvroToArrow {
+
+  /**
+   * Fetch the data from {@link GenericDatumReader} and convert it to Arrow 
objects.
+   * @param schema avro schema.
+   * @param allocator Memory allocator to use.
+   * @return Arrow Data Objects {@link VectorSchemaRoot}
+   */
+  public static VectorSchemaRoot avroToArrow(Schema schema, Decoder decoder, 
BaseAllocator allocator)
+      throws IOException {
+    Preconditions.checkNotNull(schema, "Avro schema object can not be null");
+
+    VectorSchemaRoot root = VectorSchemaRoot.create(
+        AvroToArrowUtils.avroToArrowSchema(schema), allocator);
+    AvroToArrowUtils.avroToArrowVectors(decoder, root);
+    return root;
+  }
+}
diff --git 
a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java 
b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
new file mode 100644
index 0000000..c142689
--- /dev/null
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow;
+
+import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE;
+import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.consumers.AvroBooleanConsumer;
+import org.apache.arrow.consumers.AvroBytesConsumer;
+import org.apache.arrow.consumers.AvroDoubleConsumer;
+import org.apache.arrow.consumers.AvroFloatConsumer;
+import org.apache.arrow.consumers.AvroIntConsumer;
+import org.apache.arrow.consumers.AvroLongConsumer;
+import org.apache.arrow.consumers.AvroStringConsumer;
+import org.apache.arrow.consumers.Consumer;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.BaseFixedWidthVector;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Class that does most of the work to convert Avro data into Arrow columnar 
format Vector objects.
+ */
+public class AvroToArrowUtils {
+
+  private static final int DEFAULT_BUFFER_SIZE = 256;
+
+  /**
+   * Creates an {@link org.apache.arrow.vector.types.pojo.ArrowType} from the 
{@link Schema.Field}
+   *
+   <p>This method currently performs following type mapping for Avro data 
types to corresponding Arrow data types.
+   *
+   * <ul>
+   *   <li>STRING --> ArrowType.Utf8</li>
+   *   <li>INT --> ArrowType.Int(32, signed)</li>
+   *   <li>LONG --> ArrowType.Int(64, signed)</li>
+   *   <li>FLOAT --> 
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)</li>
+   *   <li>DOUBLE --> 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)</li>
+   *   <li>BOOLEAN --> ArrowType.Bool</li>
+   *   <li>BYTES --> ArrowType.Binary</li>
+   * </ul>
+   */
+  private static ArrowType getArrowType(Type type) {
+
+    Preconditions.checkNotNull(type, "Avro type object can't be null");
+
+    switch (type) {
+      case STRING:
+        return new ArrowType.Utf8();
+      case INT:
+        return new ArrowType.Int(32, /*signed=*/true);
+      case BOOLEAN:
+        return new ArrowType.Bool();
+      case LONG:
+        return new ArrowType.Int(64, /*signed=*/true);
+      case FLOAT:
+        return new ArrowType.FloatingPoint(SINGLE);
+      case DOUBLE:
+        return new ArrowType.FloatingPoint(DOUBLE);
+      case BYTES:
+        return new ArrowType.Binary();
+      default:
+        // no-op, shouldn't get here
+        throw new RuntimeException("Can't convert avro type %s to arrow type." 
+ type.getName());
+    }
+  }
+
+  /**
+   * Create Arrow {@link org.apache.arrow.vector.types.pojo.Schema} object for 
the given Avro {@link Schema}.
+   */
+  public static org.apache.arrow.vector.types.pojo.Schema 
avroToArrowSchema(Schema schema) {
+
+    Preconditions.checkNotNull(schema, "Avro Schema object can't be null");
+    List<Field> arrowFields = new ArrayList<>();
+
+    Schema.Type type = schema.getType();
+    final Map<String, String> metadata = new HashMap<>();
+    schema.getObjectProps().forEach((k,v) -> metadata.put(k, v.toString()));
+
+    if (type == Type.RECORD) {
+      throw new UnsupportedOperationException();
+    } else if (type == Type.MAP) {
+      throw new UnsupportedOperationException();
+    } else if (type == Type.UNION) {
+      throw new UnsupportedOperationException();
+    } else if (type == Type.ARRAY) {
+      throw new UnsupportedOperationException();
+    } else if (type == Type.ENUM) {
+      throw new UnsupportedOperationException();
+    } else if (type == Type.NULL) {
+      throw new UnsupportedOperationException();
+    } else {
+      final FieldType fieldType = new FieldType(true, getArrowType(type), 
null, null);
+      arrowFields.add(new Field("", fieldType, null));
+    }
+
+    return new org.apache.arrow.vector.types.pojo.Schema(arrowFields, 
/*metadata=*/ metadata);
+  }
+
+  /**
+   * Create consumers to consume avro values from decoder, will reduce 
boxing/unboxing operations.
+   */
+  public static Consumer[] createAvroConsumers(VectorSchemaRoot root) {
+
+    Consumer[] consumers = new Consumer[root.getFieldVectors().size()];
+    for (int i = 0; i < root.getFieldVectors().size(); i++) {
+      FieldVector vector = root.getFieldVectors().get(i);
+      Consumer consumer;
+      switch (vector.getMinorType()) {
+        case INT:
+          consumer = new AvroIntConsumer((IntVector) vector);
+          break;
+        case VARBINARY:
+          consumer = new AvroBytesConsumer((VarBinaryVector) vector);
+          break;
+        case VARCHAR:
+          consumer = new AvroStringConsumer((VarCharVector) vector);
+          break;
+        case BIGINT:
+          consumer = new AvroLongConsumer((BigIntVector) vector);
+          break;
+        case FLOAT4:
+          consumer = new AvroFloatConsumer((Float4Vector) vector);
+          break;
+        case FLOAT8:
+          consumer = new AvroDoubleConsumer((Float8Vector) vector);
+          break;
+        case BIT:
+          consumer = new AvroBooleanConsumer((BitVector) vector);
+          break;
+        default:
+          throw new RuntimeException("could not get consumer from type:" + 
vector.getMinorType());
+      }
+      consumers[i] = consumer;
+    }
+    return consumers;
+  }
+
+  /**
+   * Iterate the given Avro {@link Decoder} object to fetch the data and 
transpose it to populate
+   * the given Arrow Vector objects.
+   * @param decoder avro decoder to read data.
+   * @param root Arrow {@link VectorSchemaRoot} object to populate
+   */
+  public static void avroToArrowVectors(Decoder decoder, VectorSchemaRoot 
root) throws IOException {
+
+    Preconditions.checkNotNull(decoder, "Avro decoder object can't be null");
+    Preconditions.checkNotNull(root, "VectorSchemaRoot object can't be null");
+
+    allocateVectors(root, DEFAULT_BUFFER_SIZE);
+    Consumer[] consumers = createAvroConsumers(root);
+    while (true) {
+      try {
+        for (Consumer consumer : consumers) {
+          consumer.consume(decoder);
+        }
+        //reach end will throw EOFException.
+      } catch (EOFException eofException) {
+        break;
+      }
+    }
+  }
+
+  private static void allocateVectors(VectorSchemaRoot root, int size) {
+    List<FieldVector> vectors = root.getFieldVectors();
+    for (FieldVector fieldVector : vectors) {
+      if (fieldVector instanceof BaseFixedWidthVector) {
+        ((BaseFixedWidthVector) fieldVector).allocateNew(size);
+      } else {
+        fieldVector.allocateNew();
+      }
+      fieldVector.setInitialCapacity(size);
+    }
+  }
+}
diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc 
b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
similarity index 52%
copy from java/adapter/avro/src/test/resources/schema/test.avsc
copy to 
java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
index 15fdd76..7bbfac1 100644
--- a/java/adapter/avro/src/test/resources/schema/test.avsc
+++ 
b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
@@ -15,12 +15,30 @@
  * limitations under the License.
  */
 
-{"namespace": "org.apache.arrow.avro",
- "type": "record",
- "name": "User",
- "fields": [
-     {"name": "name", "type": "string"},
-     {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_color", "type": ["string", "null"]}
- ]
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.complex.impl.BitWriterImpl;
+import org.apache.arrow.vector.complex.writer.BitWriter;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume boolean type values from avro decoder.
+ * Write the data to {@link BitVector}.
+ */
+public class AvroBooleanConsumer implements Consumer {
+
+  private final BitWriter writer;
+
+  public AvroBooleanConsumer(BitVector vector) {
+    this.writer = new BitWriterImpl(vector);
+  }
+
+  @Override
+  public void consume(Decoder decoder) throws IOException {
+    writer.writeBit(decoder.readBoolean() ? 1 : 0);
+    writer.setPosition(writer.getPosition() + 1);
+  }
 }
diff --git 
a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
 
b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
new file mode 100644
index 0000000..9c3eff7
--- /dev/null
+++ 
b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.complex.impl.VarBinaryWriterImpl;
+import org.apache.arrow.vector.complex.writer.VarBinaryWriter;
+import org.apache.arrow.vector.holders.VarBinaryHolder;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume bytes type values from avro decoder.
+ * Write the data to {@link VarBinaryVector}.
+ */
+public class AvroBytesConsumer implements Consumer {
+
+  private final VarBinaryWriter writer;
+  private final VarBinaryVector vector;
+
+  public AvroBytesConsumer(VarBinaryVector vector) {
+    this.vector = vector;
+    this.writer = new VarBinaryWriterImpl(vector);
+  }
+
+  @Override
+  public void consume(Decoder decoder) throws IOException {
+    VarBinaryHolder holder = new VarBinaryHolder();
+    ByteBuffer byteBuffer = decoder.readBytes(null);
+
+    holder.start = 0;
+    holder.end = byteBuffer.capacity();
+    holder.buffer = vector.getAllocator().buffer(byteBuffer.capacity());
+    holder.buffer.setBytes(0, byteBuffer);
+
+    writer.write(holder);
+    writer.setPosition(writer.getPosition() + 1);
+  }
+}
diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc 
b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
similarity index 51%
copy from java/adapter/avro/src/test/resources/schema/test.avsc
copy to 
java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
index 15fdd76..62dc315 100644
--- a/java/adapter/avro/src/test/resources/schema/test.avsc
+++ 
b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
@@ -15,12 +15,30 @@
  * limitations under the License.
  */
 
-{"namespace": "org.apache.arrow.avro",
- "type": "record",
- "name": "User",
- "fields": [
-     {"name": "name", "type": "string"},
-     {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_color", "type": ["string", "null"]}
- ]
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.complex.impl.Float8WriterImpl;
+import org.apache.arrow.vector.complex.writer.Float8Writer;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume double type values from avro decoder.
+ * Write the data to {@link Float8Vector}.
+ */
+public class AvroDoubleConsumer implements Consumer {
+
+  private final Float8Writer writer;
+
+  public AvroDoubleConsumer(Float8Vector vector) {
+    this.writer = new Float8WriterImpl(vector);
+  }
+
+  @Override
+  public void consume(Decoder decoder) throws IOException {
+    writer.writeFloat8(decoder.readDouble());
+    writer.setPosition(writer.getPosition() + 1);
+  }
 }
diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc 
b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
similarity index 51%
copy from java/adapter/avro/src/test/resources/schema/test.avsc
copy to 
java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
index 15fdd76..2bec2b2 100644
--- a/java/adapter/avro/src/test/resources/schema/test.avsc
+++ 
b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
@@ -15,12 +15,30 @@
  * limitations under the License.
  */
 
-{"namespace": "org.apache.arrow.avro",
- "type": "record",
- "name": "User",
- "fields": [
-     {"name": "name", "type": "string"},
-     {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_color", "type": ["string", "null"]}
- ]
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.complex.impl.Float4WriterImpl;
+import org.apache.arrow.vector.complex.writer.Float4Writer;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume float type values from avro decoder.
+ * Write the data to {@link Float4Vector}.
+ */
+public class AvroFloatConsumer implements Consumer {
+
+  private final Float4Writer writer;
+
+  public AvroFloatConsumer(Float4Vector vector) {
+    this.writer = new Float4WriterImpl(vector);
+  }
+
+  @Override
+  public void consume(Decoder decoder) throws IOException {
+    writer.writeFloat4(decoder.readFloat());
+    writer.setPosition(writer.getPosition() + 1);
+  }
 }
diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc 
b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
similarity index 52%
copy from java/adapter/avro/src/test/resources/schema/test.avsc
copy to 
java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
index 15fdd76..60285f0 100644
--- a/java/adapter/avro/src/test/resources/schema/test.avsc
+++ 
b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
@@ -15,12 +15,30 @@
  * limitations under the License.
  */
 
-{"namespace": "org.apache.arrow.avro",
- "type": "record",
- "name": "User",
- "fields": [
-     {"name": "name", "type": "string"},
-     {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_color", "type": ["string", "null"]}
- ]
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.complex.impl.IntWriterImpl;
+import org.apache.arrow.vector.complex.writer.IntWriter;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume int type values from avro decoder.
+ * Write the data to {@link IntVector}.
+ */
+public class AvroIntConsumer implements Consumer {
+
+  private final IntWriter writer;
+
+  public AvroIntConsumer(IntVector vector) {
+    this.writer = new IntWriterImpl(vector);
+  }
+
+  @Override
+  public void consume(Decoder decoder) throws IOException {
+    writer.writeInt(decoder.readInt());
+    writer.setPosition(writer.getPosition() + 1);
+  }
 }
diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc 
b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
similarity index 51%
copy from java/adapter/avro/src/test/resources/schema/test.avsc
copy to 
java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
index 15fdd76..15756af 100644
--- a/java/adapter/avro/src/test/resources/schema/test.avsc
+++ 
b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
@@ -15,12 +15,30 @@
  * limitations under the License.
  */
 
-{"namespace": "org.apache.arrow.avro",
- "type": "record",
- "name": "User",
- "fields": [
-     {"name": "name", "type": "string"},
-     {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_color", "type": ["string", "null"]}
- ]
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.complex.impl.BigIntWriterImpl;
+import org.apache.arrow.vector.complex.writer.BigIntWriter;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume long type values from avro decoder.
+ * Write the data to {@link BigIntVector}.
+ */
+public class AvroLongConsumer implements Consumer {
+
+  private final BigIntWriter writer;
+
+  public AvroLongConsumer(BigIntVector vector) {
+    this.writer = new BigIntWriterImpl(vector);
+  }
+
+  @Override
+  public void consume(Decoder decoder) throws IOException {
+    writer.writeBigInt(decoder.readLong());
+    writer.setPosition(writer.getPosition() + 1);
+  }
 }
diff --git 
a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
 
b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
new file mode 100644
index 0000000..db438f9
--- /dev/null
+++ 
b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.impl.VarCharWriterImpl;
+import org.apache.arrow.vector.complex.writer.VarCharWriter;
+import org.apache.arrow.vector.holders.VarCharHolder;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume string type values from avro decoder.
+ * Write the data to {@link VarCharVector}.
+ */
+public class AvroStringConsumer implements Consumer {
+
+  private final VarCharVector vector;
+  private final VarCharWriter writer;
+
+  public AvroStringConsumer(VarCharVector vector) {
+    this.vector = vector;
+    this.writer = new VarCharWriterImpl(vector);
+  }
+
+  @Override
+  public void consume(Decoder decoder) throws IOException {
+    VarCharHolder holder = new VarCharHolder();
+    ByteBuffer byteBuffer = decoder.readBytes(null);
+
+    holder.start = 0;
+    holder.end = byteBuffer.capacity();
+    holder.buffer = vector.getAllocator().buffer(byteBuffer.capacity());
+    holder.buffer.setBytes(0, byteBuffer);
+
+    writer.write(holder);
+    writer.setPosition(writer.getPosition() + 1);
+  }
+}
diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc 
b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
similarity index 75%
copy from java/adapter/avro/src/test/resources/schema/test.avsc
copy to java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
index 15fdd76..b3c5281 100644
--- a/java/adapter/avro/src/test/resources/schema/test.avsc
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
@@ -15,12 +15,16 @@
  * limitations under the License.
  */
 
-{"namespace": "org.apache.arrow.avro",
- "type": "record",
- "name": "User",
- "fields": [
-     {"name": "name", "type": "string"},
-     {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_color", "type": ["string", "null"]}
- ]
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.avro.io.Decoder;
+
+/**
+ * An abstraction that is used to consume values from avro decoder.
+ */
+public interface Consumer {
+
+  void consume(Decoder decoder) throws IOException;
 }
diff --git 
a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java 
b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java
new file mode 100644
index 0000000..d880639
--- /dev/null
+++ b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.arrow.memory.BaseAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class AvroToArrowTest {
+
+  @ClassRule
+  public static final TemporaryFolder TMP = new TemporaryFolder();
+
+  private BaseAllocator allocator;
+
+  @Before
+  public void init() {
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  private Schema getSchema(String schemaName) throws Exception {
+    Path schemaPath = 
Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(),
+        "schema", schemaName);
+    return new Schema.Parser().parse(schemaPath.toFile());
+  }
+
+  private VectorSchemaRoot writeAndReadPrimitive(Schema schema, List data) 
throws Exception {
+    File dataFile = TMP.newFile();
+
+    BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(new 
FileOutputStream(dataFile), null);
+    DatumWriter writer = new GenericDatumWriter(schema);
+    BinaryDecoder decoder = new DecoderFactory().directBinaryDecoder(new 
FileInputStream(dataFile), null);
+
+    for (Object value : data) {
+      writer.write(value, encoder);
+    }
+
+    return AvroToArrow.avroToArrow(schema, decoder, allocator);
+  }
+
+  @Test
+  public void testStringType() throws Exception {
+    Schema schema = getSchema("test_primitive_string.avsc");
+    ArrayList<String> data = new ArrayList(Arrays.asList("v1", "v2", "v3", 
"v4", "v5"));
+
+    VectorSchemaRoot root = writeAndReadPrimitive(schema, data);
+    FieldVector vector = root.getFieldVectors().get(0);
+
+    checkPrimitiveResult(schema, data, vector);
+  }
+
+  @Test
+  public void testIntType() throws Exception {
+    Schema schema = getSchema("test_primitive_int.avsc");
+    ArrayList<Integer> data = new ArrayList(Arrays.asList(1, 2, 3, 4, 5));
+
+    VectorSchemaRoot root = writeAndReadPrimitive(schema, data);
+    FieldVector vector = root.getFieldVectors().get(0);
+
+    checkPrimitiveResult(schema, data, vector);
+  }
+
+  @Test
+  public void testLongType() throws Exception {
+    Schema schema = getSchema("test_primitive_long.avsc");
+    ArrayList<Long> data = new ArrayList(Arrays.asList(1L, 2L, 3L, 4L, 5L));
+
+    VectorSchemaRoot root = writeAndReadPrimitive(schema, data);
+    FieldVector vector = root.getFieldVectors().get(0);
+
+    checkPrimitiveResult(schema, data, vector);
+  }
+
+  @Test
+  public void testFloatType() throws Exception {
+    Schema schema = getSchema("test_primitive_float.avsc");
+    ArrayList<Float> data = new ArrayList(Arrays.asList(1.1f, 2.2f, 3.3f, 
4.4f, 5.5f));
+
+    VectorSchemaRoot root = writeAndReadPrimitive(schema, data);
+    FieldVector vector = root.getFieldVectors().get(0);
+
+    checkPrimitiveResult(schema, data, vector);
+  }
+
+  @Test
+  public void testDoubleType() throws Exception {
+    Schema schema = getSchema("test_primitive_double.avsc");
+    ArrayList<Double> data = new ArrayList(Arrays.asList(1.1, 2.2, 3.3, 4.4, 
5.5));
+
+    VectorSchemaRoot root = writeAndReadPrimitive(schema, data);
+    FieldVector vector = root.getFieldVectors().get(0);
+
+    checkPrimitiveResult(schema, data, vector);
+  }
+
+  @Test
+  public void testBytesType() throws Exception {
+    Schema schema = getSchema("test_primitive_bytes.avsc");
+    ArrayList<ByteBuffer> data = new ArrayList(Arrays.asList(
+        ByteBuffer.wrap("value1".getBytes(StandardCharsets.UTF_8)),
+        ByteBuffer.wrap("value2".getBytes(StandardCharsets.UTF_8)),
+        ByteBuffer.wrap("value3".getBytes(StandardCharsets.UTF_8)),
+        ByteBuffer.wrap("value4".getBytes(StandardCharsets.UTF_8)),
+        ByteBuffer.wrap("value5".getBytes(StandardCharsets.UTF_8))));
+
+    VectorSchemaRoot root = writeAndReadPrimitive(schema, data);
+    FieldVector vector = root.getFieldVectors().get(0);
+
+    checkPrimitiveResult(schema, data, vector);
+  }
+
+  @Test
+  public void testBooleanType() throws Exception {
+    Schema schema = getSchema("test_primitive_boolean.avsc");
+    ArrayList<Boolean> data = new ArrayList(Arrays.asList(true, false, true, 
false, true));
+
+    VectorSchemaRoot root = writeAndReadPrimitive(schema, data);
+    FieldVector vector = root.getFieldVectors().get(0);
+
+    checkPrimitiveResult(schema, data, vector);
+  }
+
+  private void checkPrimitiveResult(Schema schema, ArrayList data, FieldVector 
vector) {
+    assertEquals(data.size(), vector.getValueCount());
+    for (int i = 0; i < data.size(); i++) {
+      Object value1 = data.get(i);
+      Object value2 = vector.getObject(i);
+      if (schema.getType() == Schema.Type.BYTES) {
+        value2 = ByteBuffer.wrap((byte[]) value2);
+      } else if (schema.getType() == Schema.Type.STRING) {
+        value2 = value2.toString();
+      }
+      assertTrue(Objects.equals(value1, value2));
+    }
+  }
+}
diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc 
b/java/adapter/avro/src/test/resources/schema/test.avsc
index 15fdd76..92c0873 100644
--- a/java/adapter/avro/src/test/resources/schema/test.avsc
+++ b/java/adapter/avro/src/test/resources/schema/test.avsc
@@ -15,7 +15,8 @@
  * limitations under the License.
  */
 
-{"namespace": "org.apache.arrow.avro",
+{
+ "namespace": "org.apache.arrow.avro",
  "type": "record",
  "name": "User",
  "fields": [
diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc 
b/java/adapter/avro/src/test/resources/schema/test_primitive_boolean.avsc
similarity index 76%
copy from java/adapter/avro/src/test/resources/schema/test.avsc
copy to java/adapter/avro/src/test/resources/schema/test_primitive_boolean.avsc
index 15fdd76..7652ce7 100644
--- a/java/adapter/avro/src/test/resources/schema/test.avsc
+++ b/java/adapter/avro/src/test/resources/schema/test_primitive_boolean.avsc
@@ -15,12 +15,8 @@
  * limitations under the License.
  */
 
-{"namespace": "org.apache.arrow.avro",
- "type": "record",
- "name": "User",
- "fields": [
-     {"name": "name", "type": "string"},
-     {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_color", "type": ["string", "null"]}
- ]
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "boolean",
+ "name": "TestBoolean"
 }
diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc 
b/java/adapter/avro/src/test/resources/schema/test_primitive_bytes.avsc
similarity index 76%
copy from java/adapter/avro/src/test/resources/schema/test.avsc
copy to java/adapter/avro/src/test/resources/schema/test_primitive_bytes.avsc
index 15fdd76..5102430 100644
--- a/java/adapter/avro/src/test/resources/schema/test.avsc
+++ b/java/adapter/avro/src/test/resources/schema/test_primitive_bytes.avsc
@@ -15,12 +15,8 @@
  * limitations under the License.
  */
 
-{"namespace": "org.apache.arrow.avro",
- "type": "record",
- "name": "User",
- "fields": [
-     {"name": "name", "type": "string"},
-     {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_color", "type": ["string", "null"]}
- ]
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "bytes",
+ "name": "TestBytes"
 }
diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc 
b/java/adapter/avro/src/test/resources/schema/test_primitive_double.avsc
similarity index 76%
copy from java/adapter/avro/src/test/resources/schema/test.avsc
copy to java/adapter/avro/src/test/resources/schema/test_primitive_double.avsc
index 15fdd76..d1ae0b6 100644
--- a/java/adapter/avro/src/test/resources/schema/test.avsc
+++ b/java/adapter/avro/src/test/resources/schema/test_primitive_double.avsc
@@ -15,12 +15,8 @@
  * limitations under the License.
  */
 
-{"namespace": "org.apache.arrow.avro",
- "type": "record",
- "name": "User",
- "fields": [
-     {"name": "name", "type": "string"},
-     {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_color", "type": ["string", "null"]}
- ]
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "double",
+ "name": "TestDouble"
 }
diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc 
b/java/adapter/avro/src/test/resources/schema/test_primitive_float.avsc
similarity index 76%
copy from java/adapter/avro/src/test/resources/schema/test.avsc
copy to java/adapter/avro/src/test/resources/schema/test_primitive_float.avsc
index 15fdd76..675d109 100644
--- a/java/adapter/avro/src/test/resources/schema/test.avsc
+++ b/java/adapter/avro/src/test/resources/schema/test_primitive_float.avsc
@@ -15,12 +15,8 @@
  * limitations under the License.
  */
 
-{"namespace": "org.apache.arrow.avro",
- "type": "record",
- "name": "User",
- "fields": [
-     {"name": "name", "type": "string"},
-     {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_color", "type": ["string", "null"]}
- ]
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "float",
+ "name": "TestFloat"
 }
diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc 
b/java/adapter/avro/src/test/resources/schema/test_primitive_int.avsc
similarity index 76%
copy from java/adapter/avro/src/test/resources/schema/test.avsc
copy to java/adapter/avro/src/test/resources/schema/test_primitive_int.avsc
index 15fdd76..8fc8488 100644
--- a/java/adapter/avro/src/test/resources/schema/test.avsc
+++ b/java/adapter/avro/src/test/resources/schema/test_primitive_int.avsc
@@ -15,12 +15,8 @@
  * limitations under the License.
  */
 
-{"namespace": "org.apache.arrow.avro",
- "type": "record",
- "name": "User",
- "fields": [
-     {"name": "name", "type": "string"},
-     {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_color", "type": ["string", "null"]}
- ]
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "int",
+ "name": "TestInt"
 }
diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc 
b/java/adapter/avro/src/test/resources/schema/test_primitive_long.avsc
similarity index 76%
copy from java/adapter/avro/src/test/resources/schema/test.avsc
copy to java/adapter/avro/src/test/resources/schema/test_primitive_long.avsc
index 15fdd76..b970610 100644
--- a/java/adapter/avro/src/test/resources/schema/test.avsc
+++ b/java/adapter/avro/src/test/resources/schema/test_primitive_long.avsc
@@ -15,12 +15,8 @@
  * limitations under the License.
  */
 
-{"namespace": "org.apache.arrow.avro",
- "type": "record",
- "name": "User",
- "fields": [
-     {"name": "name", "type": "string"},
-     {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_color", "type": ["string", "null"]}
- ]
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "long",
+ "name": "TestLong"
 }
diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc 
b/java/adapter/avro/src/test/resources/schema/test_primitive_string.avsc
similarity index 76%
copy from java/adapter/avro/src/test/resources/schema/test.avsc
copy to java/adapter/avro/src/test/resources/schema/test_primitive_string.avsc
index 15fdd76..b4a89a7 100644
--- a/java/adapter/avro/src/test/resources/schema/test.avsc
+++ b/java/adapter/avro/src/test/resources/schema/test_primitive_string.avsc
@@ -15,12 +15,8 @@
  * limitations under the License.
  */
 
-{"namespace": "org.apache.arrow.avro",
- "type": "record",
- "name": "User",
- "fields": [
-     {"name": "name", "type": "string"},
-     {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_color", "type": ["string", "null"]}
- ]
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "string",
+ "name": "TestString"
 }

Reply via email to