more updates

Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/0f56aaad
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/0f56aaad
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/0f56aaad

Branch: refs/heads/orc-72
Commit: 0f56aaade0b33c5473d640fd70adb7ee59a694a3
Parents: 73cdb4c
Author: Owen O'Malley <omal...@apache.org>
Authored: Tue Oct 11 15:25:10 2016 -0700
Committer: Owen O'Malley <omal...@apache.org>
Committed: Tue Oct 11 15:25:10 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/orc/bench/TaxiToAvro.java   |   4 +-
 .../bench/parquet/DataWritableWriteSupport.java |  18 +-
 .../orc/bench/parquet/DataWritableWriter.java   | 466 ++++++++-----------
 .../apache/orc/bench/parquet/ParquetScan.java   |   6 +-
 .../apache/orc/bench/parquet/RowInBatch.java    |  65 ++-
 5 files changed, 283 insertions(+), 276 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/0f56aaad/java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java 
b/java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java
index 2b14f50..9fd2f23 100644
--- a/java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java
+++ b/java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java
@@ -39,12 +39,12 @@ public class TaxiToAvro {
   }
 
   public static void main(String[] args) throws Exception {
-    TypeDescription schema = TaxiToOrc.loadSchema("nyc-taxi.schema");
+    TypeDescription schema = Utilities.loadSchema("nyc-taxi.schema");
     Configuration conf = new Configuration();
     AvroWriter writer = new AvroWriter(new Path(args[0]), schema, conf,
         getCodec(args[1]));
     VectorizedRowBatch batch = schema.createRowBatch();
-    for(String inFile: TaxiToOrc.sliceArray(args, 2)) {
+    for(String inFile: Utilities.sliceArray(args, 2)) {
       CsvReader reader = new CsvReader(new Path(inFile), conf, schema);
       while (reader.nextBatch(batch)) {
         writer.writeBatch(batch);

http://git-wip-us.apache.org/repos/asf/orc/blob/0f56aaad/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableWriteSupport.java
----------------------------------------------------------------------
diff --git 
a/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableWriteSupport.java
 
b/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableWriteSupport.java
index f4621e5..2b8a1d3 100644
--- 
a/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableWriteSupport.java
+++ 
b/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableWriteSupport.java
@@ -11,13 +11,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hive.ql.io.parquet.write;
+package org.apache.orc.bench.parquet;
 
 import java.util.HashMap;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 
+import org.apache.orc.TypeDescription;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.schema.MessageType;
@@ -28,15 +28,20 @@ import org.apache.parquet.schema.MessageTypeParser;
  * DataWritableWriteSupport is a WriteSupport for the DataWritableWriter
  *
  */
-public class DataWritableWriteSupport extends WriteSupport<ParquetHiveRecord> {
+public class DataWritableWriteSupport extends WriteSupport<RowInBatch> {
 
   public static final String PARQUET_HIVE_SCHEMA = "parquet.hive.schema";
+  public static final String HIVE_SCHEMA = "hive.schema";
 
   private DataWritableWriter writer;
   private MessageType schema;
+  private TypeDescription hiveType;
 
-  public static void setSchema(final MessageType schema, final Configuration 
configuration) {
+  public static void setSchema(final MessageType schema,
+                               final TypeDescription hiveType,
+                               final Configuration configuration) {
     configuration.set(PARQUET_HIVE_SCHEMA, schema.toString());
+    configuration.set(HIVE_SCHEMA, hiveType.toString());
   }
 
   public static MessageType getSchema(final Configuration configuration) {
@@ -46,16 +51,17 @@ public class DataWritableWriteSupport extends 
WriteSupport<ParquetHiveRecord> {
   @Override
   public WriteContext init(final Configuration configuration) {
     schema = getSchema(configuration);
+    hiveType = TypeDescription.fromString(configuration.get(HIVE_SCHEMA));
     return new WriteContext(schema, new HashMap<String, String>());
   }
 
   @Override
   public void prepareForWrite(final RecordConsumer recordConsumer) {
-    writer = new DataWritableWriter(recordConsumer, schema);
+    writer = new DataWritableWriter(recordConsumer, schema, hiveType);
   }
 
   @Override
-  public void write(final ParquetHiveRecord record) {
+  public void write(final RowInBatch record) {
     writer.write(record);
   }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/0f56aaad/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableWriter.java
----------------------------------------------------------------------
diff --git 
a/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableWriter.java 
b/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableWriter.java
index 220e452..bb2e1eb 100644
--- a/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableWriter.java
+++ b/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableWriter.java
@@ -13,20 +13,23 @@
  */
 package org.apache.orc.bench.parquet;
 
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.orc.TypeDescription;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.Type;
 
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.util.Map;
+import java.util.List;
 
 /**
  *
@@ -55,10 +58,13 @@ public class DataWritableWriter {
 
   /**
    * It writes a record to Parquet.
+   *
    * @param record Contains the record that is going to be written.
    */
   public void write(final RowInBatch record) {
-      messageWriter.write(record);
+    for(int r=0; r < record.batch.size; ++r) {
+      messageWriter.write(record, r);
+    }
   }
 
   private MessageDataWriter createMessageWriter(TypeDescription hiveType,
@@ -69,8 +75,9 @@ public class DataWritableWriter {
   /**
    * Creates a writer for the specific object inspector. The returned writer 
will be used
    * to call Parquet API for the specific data type.
+   *
    * @param hiveType The type description used to get the correct value type.
-   * @param type Type that contains information about the type schema.
+   * @param type     Type that contains information about the type schema.
    * @return A ParquetWriter object used to call the Parquet API fo the 
specific data type.
    */
   private DataWriter createWriter(TypeDescription hiveType, Type type) {
@@ -78,9 +85,9 @@ public class DataWritableWriter {
       case BOOLEAN:
         return new BooleanDataWriter(hiveType);
       case BYTE:
-        return new ByteDataWriter(hiveType);
+        return new IntDataWriter(hiveType);
       case SHORT:
-        return new ShortDataWriter(hiveType);
+        return new IntDataWriter(hiveType);
       case INT:
         return new IntDataWriter(hiveType);
       case LONG:
@@ -94,9 +101,9 @@ public class DataWritableWriter {
       case CHAR:
         return new CharDataWriter(hiveType);
       case VARCHAR:
-        return new VarcharDataWriter(hiveType);
+        return new StringDataWriter(hiveType);
       case BINARY:
-        return new BinaryDataWriter(hiveType);
+        return new StringDataWriter(hiveType);
       case TIMESTAMP:
         return new TimestampDataWriter(hiveType);
       case DECIMAL:
@@ -104,59 +111,14 @@ public class DataWritableWriter {
       case DATE:
         return new DateDataWriter(hiveType);
       case LIST:
+        return new ListDataWriter(hiveType, type.asGroupType());
       case MAP:
+        return new MapDataWriter(hiveType, type.asGroupType());
       case STRUCT:
+        return new StructDataWriter(hiveType, type.asGroupType());
       default:
         throw new IllegalArgumentException("Unhandled type " + hiveType);
     }
-    if (type.isPrimitive()) {
-      PrimitiveObjectInspector primitiveInspector = 
(PrimitiveObjectInspector)inspector;
-      switch (primitiveInspector.getPrimitiveCategory()) {
-        case BOOLEAN:
-        case BYTE:
-        case SHORT:
-        case INT:
-        case LONG:
-        case FLOAT:
-        case DOUBLE:
-        case STRING:
-        case CHAR:
-        case VARCHAR:
-        case BINARY:
-        case TIMESTAMP:
-        case DECIMAL:
-        case DATE:
-        default:
-          throw new IllegalArgumentException("Unsupported primitive data type: 
" + primitiveInspector.getPrimitiveCategory());
-      }
-    } else {
-      GroupType groupType = type.asGroupType();
-      OriginalType originalType = type.getOriginalType();
-
-      if (originalType != null && originalType.equals(OriginalType.LIST)) {
-        checkInspectorCategory(inspector, ObjectInspector.Category.LIST);
-        return new ListDataWriter((ListObjectInspector)inspector, groupType);
-      } else if (originalType != null && 
originalType.equals(OriginalType.MAP)) {
-        checkInspectorCategory(inspector, ObjectInspector.Category.MAP);
-        return new MapDataWriter((MapObjectInspector)inspector, groupType);
-      } else {
-        checkInspectorCategory(inspector, ObjectInspector.Category.STRUCT);
-        return new StructDataWriter((StructObjectInspector)inspector, 
groupType);
-      }
-    }
-  }
-
-  /**
-   * Checks that an inspector matches the category indicated as a parameter.
-   * @param inspector The object inspector to check
-   * @param category The category to match
-   * @throws IllegalArgumentException if inspector does not match the category
-   */
-  private void checkInspectorCategory(ObjectInspector inspector, 
ObjectInspector.Category category) {
-    if (!inspector.getCategory().equals(category)) {
-      throw new IllegalArgumentException("Invalid data type: expected " + 
category
-          + " type, but found: " + inspector.getCategory());
-    }
   }
 
   abstract class DataWriter {
@@ -166,80 +128,72 @@ public class DataWritableWriter {
       id = type.getId();
     }
 
-    abstract void write(RowInBatch value);
+    abstract void write(RowInBatch value, int row);
   }
 
-  private class GroupDataWriter implements DataWriter {
-    private StructObjectInspector inspector;
-    private List<? extends StructField> structFields;
+  private class GroupDataWriter extends DataWriter {
+    private String[] fieldNames;
     private DataWriter[] structWriters;
 
-    public GroupDataWriter(StructObjectInspector inspector, GroupType 
groupType) {
-      this.inspector = inspector;
+    public GroupDataWriter(TypeDescription inspector, GroupType groupType) {
+      super(inspector);
 
-      structFields = this.inspector.getAllStructFieldRefs();
-      structWriters = new DataWriter[structFields.size()];
+      List<TypeDescription> children = inspector.getChildren();
+      structWriters = new DataWriter[children.size()];
+      fieldNames = new String[children.size()];
+      List<String> childrenNames = inspector.getFieldNames();
 
-      for (int i = 0; i < structFields.size(); i++) {
-        StructField field = structFields.get(i);
-        structWriters[i] = createWriter(field.getFieldObjectInspector(), 
groupType.getType(i));
+      for (int i = 0; i < children.size(); i++) {
+        fieldNames[i] = childrenNames.get(i);
+        structWriters[i] = createWriter(children.get(i), groupType.getType(i));
       }
     }
 
     @Override
-    public void write(Object value) {
-      for (int i = 0; i < structFields.size(); i++) {
-        StructField field = structFields.get(i);
-        Object fieldValue = inspector.getStructFieldData(value, field);
-
-        if (fieldValue != null) {
-          String fieldName = field.getFieldName();
-          DataWriter writer = structWriters[i];
-
-          recordConsumer.startField(fieldName, i);
-          writer.write(fieldValue);
-          recordConsumer.endField(fieldName, i);
-        }
+    public void write(RowInBatch value, int row) {
+      for (int i = 0; i < structWriters.length; i++) {
+        recordConsumer.startField(fieldNames[i], i);
+        structWriters[i].write(value, row);
+        recordConsumer.endField(fieldNames[i], i);
       }
     }
   }
 
-  private class MessageDataWriter extends GroupDataWriter implements 
DataWriter {
-    public MessageDataWriter(StructObjectInspector inspector, GroupType 
groupType) {
+  private class MessageDataWriter extends GroupDataWriter {
+    public MessageDataWriter(TypeDescription inspector, GroupType groupType) {
       super(inspector, groupType);
     }
 
     @Override
-    public void write(Object value) {
+    public void write(RowInBatch value, int row) {
       recordConsumer.startMessage();
       if (value != null) {
-        super.write(value);
+        super.write(value, row);
       }
       recordConsumer.endMessage();
     }
   }
 
-  private class StructDataWriter extends GroupDataWriter implements DataWriter 
{
-    public StructDataWriter(StructObjectInspector inspector, GroupType 
groupType) {
+  private class StructDataWriter extends GroupDataWriter {
+    public StructDataWriter(TypeDescription inspector, GroupType groupType) {
       super(inspector, groupType);
     }
 
     @Override
-    public void write(Object value) {
+    public void write(RowInBatch value, int row) {
       recordConsumer.startGroup();
-      super.write(value);
+      super.write(value, row);
       recordConsumer.endGroup();
     }
   }
 
-  private class ListDataWriter implements DataWriter {
-    private ListObjectInspector inspector;
+  private class ListDataWriter extends DataWriter {
     private String elementName;
     private DataWriter elementWriter;
     private String repeatedGroupName;
 
-    public ListDataWriter(ListObjectInspector inspector, GroupType groupType) {
-      this.inspector = inspector;
+    public ListDataWriter(TypeDescription inspector, GroupType groupType) {
+      super(inspector);
 
       // Get the internal array structure
       GroupType repeatedType = groupType.getType(0).asGroupType();
@@ -248,43 +202,45 @@ public class DataWritableWriter {
       Type elementType = repeatedType.getType(0);
       this.elementName = elementType.getName();
 
-      ObjectInspector elementInspector = 
this.inspector.getListElementObjectInspector();
-      this.elementWriter = createWriter(elementInspector, elementType);
+      this.elementWriter = createWriter(inspector.getChildren().get(0),
+          elementType);
     }
 
     @Override
-    public void write(Object value) {
-      recordConsumer.startGroup();
-      int listLength = inspector.getListLength(value);
-
-      if (listLength > 0) {
-        recordConsumer.startField(repeatedGroupName, 0);
-
-        for (int i = 0; i < listLength; i++) {
-          Object element = inspector.getListElement(value, i);
-          recordConsumer.startGroup();
-          if (element != null) {
+    public void write(RowInBatch value, int row) {
+      if (value.columns[id].isRepeating) {
+        row = 0;
+      }
+      if (value.columns[id].noNulls || !value.columns[id].isNull[row]) {
+        recordConsumer.startGroup();
+        ListColumnVector tv = (ListColumnVector) value.columns[id];
+        int listLength = (int) tv.lengths[row];
+
+        if (listLength > 0) {
+          recordConsumer.startField(repeatedGroupName, 0);
+          int start = (int) tv.offsets[row];
+          for (int i = 0; i < listLength; i++) {
+            recordConsumer.startGroup();
             recordConsumer.startField(elementName, 0);
-            elementWriter.write(element);
+            elementWriter.write(value, start + i);
             recordConsumer.endField(elementName, 0);
+            recordConsumer.endGroup();
           }
-          recordConsumer.endGroup();
-        }
 
-        recordConsumer.endField(repeatedGroupName, 0);
+          recordConsumer.endField(repeatedGroupName, 0);
+        }
+        recordConsumer.endGroup();
       }
-      recordConsumer.endGroup();
     }
   }
 
-  private class MapDataWriter implements DataWriter {
-    private MapObjectInspector inspector;
+  private class MapDataWriter extends DataWriter {
     private String repeatedGroupName;
     private String keyName, valueName;
     private DataWriter keyWriter, valueWriter;
 
-    public MapDataWriter(MapObjectInspector inspector, GroupType groupType) {
-      this.inspector = inspector;
+    public MapDataWriter(TypeDescription inspector, GroupType groupType) {
+      super(inspector);
 
       // Get the internal map structure (MAP_KEY_VALUE)
       GroupType repeatedType = groupType.getType(0).asGroupType();
@@ -292,40 +248,39 @@ public class DataWritableWriter {
 
       // Get key element information
       Type keyType = repeatedType.getType(0);
-      ObjectInspector keyInspector = this.inspector.getMapKeyObjectInspector();
+      TypeDescription keyInspector = inspector.getChildren().get(0);
       this.keyName = keyType.getName();
       this.keyWriter = createWriter(keyInspector, keyType);
 
       // Get value element information
       Type valuetype = repeatedType.getType(1);
-      ObjectInspector valueInspector = 
this.inspector.getMapValueObjectInspector();
+      TypeDescription valueInspector = inspector.getChildren().get(1);
       this.valueName = valuetype.getName();
       this.valueWriter = createWriter(valueInspector, valuetype);
     }
 
     @Override
-    public void write(Object value) {
-      recordConsumer.startGroup();
-
-      Map<?, ?> mapValues = inspector.getMap(value);
-      if (mapValues != null && mapValues.size() > 0) {
-        recordConsumer.startField(repeatedGroupName, 0);
-        for (Map.Entry<?, ?> keyValue : mapValues.entrySet()) {
-          recordConsumer.startGroup();
-          if (keyValue != null) {
-            // write key element
-            Object keyElement = keyValue.getKey();
+    public void write(RowInBatch value, int row) {
+      if (value.columns[id].isRepeating) {
+        row = 0;
+      }
+      if (value.columns[id].noNulls || !value.columns[id].isNull[row]) {
+        recordConsumer.startGroup();
+        MapColumnVector tv = (MapColumnVector) value.columns[id];
+        int start = (int) tv.offsets[row];
+        int length = (int) tv.lengths[row];
+        if (length > 0) {
+          recordConsumer.startField(repeatedGroupName, 0);
+          for (int i=0; i < length; ++i) {
+            recordConsumer.startGroup();
             recordConsumer.startField(keyName, 0);
-            keyWriter.write(keyElement);
+            keyWriter.write(value, start + i);
             recordConsumer.endField(keyName, 0);
 
             // write value element
-            Object valueElement = keyValue.getValue();
-            if (valueElement != null) {
-              recordConsumer.startField(valueName, 1);
-              valueWriter.write(valueElement);
-              recordConsumer.endField(valueName, 1);
-            }
+            recordConsumer.startField(valueName, 1);
+            valueWriter.write(value, start + i);
+            recordConsumer.endField(valueName, 1);
           }
           recordConsumer.endGroup();
         }
@@ -336,186 +291,173 @@ public class DataWritableWriter {
     }
   }
 
-  private class BooleanDataWriter implements DataWriter {
-    private BooleanObjectInspector inspector;
-
-    public BooleanDataWriter(BooleanObjectInspector inspector) {
-      this.inspector = inspector;
-    }
-
-    @Override
-    public void write(Object value) {
-      recordConsumer.addBoolean(inspector.get(value));
-    }
-  }
-
-  private class ByteDataWriter implements DataWriter {
-    private ByteObjectInspector inspector;
-
-    public ByteDataWriter(ByteObjectInspector inspector) {
-      this.inspector = inspector;
-    }
-
-    @Override
-    public void write(Object value) {
-      recordConsumer.addInteger(inspector.get(value));
-    }
-  }
+  private class BooleanDataWriter extends DataWriter {
 
-  private class ShortDataWriter implements DataWriter {
-    private ShortObjectInspector inspector;
-    public ShortDataWriter(ShortObjectInspector inspector) {
-      this.inspector = inspector;
+    public BooleanDataWriter(TypeDescription inspector) {
+      super(inspector);
     }
 
     @Override
-    public void write(Object value) {
-      recordConsumer.addInteger(inspector.get(value));
+    public void write(RowInBatch value, int row) {
+      if (value.columns[id].isRepeating) {
+        row = 0;
+      }
+      if (value.columns[id].noNulls || !value.columns[id].isNull[row]) {
+        recordConsumer.addBoolean(((LongColumnVector) 
value.columns[id]).vector[row] != 0);
+      }
     }
   }
 
-  private class IntDataWriter implements DataWriter {
+  private class IntDataWriter extends DataWriter {
 
     public IntDataWriter(TypeDescription inspector) {
-      this.inspector = inspector;
+      super(inspector);
     }
 
     @Override
-    public void write(Object value) {
-      recordConsumer.addInteger(inspector.get(value));
-    }
-  }
-
-  private class LongDataWriter implements DataWriter {
-    private LongObjectInspector inspector;
-
-    public LongDataWriter(LongObjectInspector inspector) {
-      this.inspector = inspector;
-    }
-
-    @Override
-    public void write(Object value) {
-      recordConsumer.addLong(inspector.get(value));
-    }
-  }
-
-  private class FloatDataWriter implements DataWriter {
-    private FloatObjectInspector inspector;
-
-    public FloatDataWriter(FloatObjectInspector inspector) {
-      this.inspector = inspector;
-    }
-
-    @Override
-    public void write(Object value) {
-      recordConsumer.addFloat(inspector.get(value));
+    public void write(RowInBatch value, int row) {
+      if (value.columns[id].isRepeating) {
+        row = 0;
+      }
+      if (value.columns[id].noNulls || !value.columns[id].isNull[row]) {
+        recordConsumer.addInteger((int) ((LongColumnVector) 
value.columns[id]).vector[row]);
+      }
     }
   }
 
-  private class DoubleDataWriter implements DataWriter {
-    private DoubleObjectInspector inspector;
+  private class LongDataWriter extends DataWriter {
 
-    public DoubleDataWriter(DoubleObjectInspector inspector) {
-      this.inspector = inspector;
+    public LongDataWriter(TypeDescription inspector) {
+      super(inspector);
     }
 
     @Override
-    public void write(Object value) {
-      recordConsumer.addDouble(inspector.get(value));
+    public void write(RowInBatch value, int row) {
+      if (value.columns[id].isRepeating) {
+        row = 0;
+      }
+      if (value.columns[id].noNulls || !value.columns[id].isNull[row]) {
+        recordConsumer.addLong(((LongColumnVector) 
value.columns[id]).vector[row]);
+      }
     }
   }
 
-  private class StringDataWriter implements DataWriter {
-    private StringObjectInspector inspector;
+  private class FloatDataWriter extends DataWriter {
 
-    public StringDataWriter(StringObjectInspector inspector) {
-      this.inspector = inspector;
+    public FloatDataWriter(TypeDescription inspector) {
+      super(inspector);
     }
 
     @Override
-    public void write(Object value) {
-      String v = inspector.getPrimitiveJavaObject(value);
-      recordConsumer.addBinary(Binary.fromString(v));
+    public void write(RowInBatch value, int row) {
+      if (value.columns[id].isRepeating) {
+        row = 0;
+      }
+      if (value.columns[id].noNulls || !value.columns[id].isNull[row]) {
+        recordConsumer.addFloat((float) ((DoubleColumnVector) 
value.columns[id]).vector[row]);
+      }
     }
   }
 
-  private class CharDataWriter implements DataWriter {
-    private HiveCharObjectInspector inspector;
+  private class DoubleDataWriter extends DataWriter {
 
-    public CharDataWriter(HiveCharObjectInspector inspector) {
-      this.inspector = inspector;
+    public DoubleDataWriter(TypeDescription inspector) {
+      super(inspector);
     }
 
     @Override
-    public void write(Object value) {
-      String v = inspector.getPrimitiveJavaObject(value).getStrippedValue();
-      recordConsumer.addBinary(Binary.fromString(v));
+    public void write(RowInBatch value, int row) {
+      if (value.columns[id].isRepeating) {
+        row = 0;
+      }
+      if (value.columns[id].noNulls || !value.columns[id].isNull[row]) {
+        recordConsumer.addDouble(((DoubleColumnVector) 
value.columns[id]).vector[row]);
+      }
     }
   }
 
-  private class VarcharDataWriter implements DataWriter {
-    private HiveVarcharObjectInspector inspector;
+  private class StringDataWriter extends DataWriter {
 
-    public VarcharDataWriter(HiveVarcharObjectInspector inspector) {
-      this.inspector = inspector;
+    public StringDataWriter(TypeDescription inspector) {
+      super(inspector);
     }
 
     @Override
-    public void write(Object value) {
-      String v = inspector.getPrimitiveJavaObject(value).getValue();
-      recordConsumer.addBinary(Binary.fromString(v));
+    public void write(RowInBatch value, int row) {
+      if (value.columns[id].isRepeating) {
+        row = 0;
+      }
+      if (value.columns[id].noNulls || !value.columns[id].isNull[row]) {
+        BytesColumnVector tv = (BytesColumnVector) value.columns[id];
+        Binary buffer = Binary.fromReusedByteArray(tv.vector[row],
+            tv.start[row], tv.length[row]);
+        recordConsumer.addBinary(buffer);
+      }
     }
   }
 
-  private class BinaryDataWriter implements DataWriter {
-    private BinaryObjectInspector inspector;
+  private class CharDataWriter extends DataWriter {
 
-    public BinaryDataWriter(BinaryObjectInspector inspector) {
-      this.inspector = inspector;
+    public CharDataWriter(TypeDescription inspector) {
+      super(inspector);
     }
 
     @Override
-    public void write(Object value) {
-      byte[] vBinary = inspector.getPrimitiveJavaObject(value);
-      recordConsumer.addBinary(Binary.fromByteArray(vBinary));
+    public void write(RowInBatch value, int row) {
+      if (value.columns[id].isRepeating) {
+        row = 0;
+      }
+      if (value.columns[id].noNulls || !value.columns[id].isNull[row]) {
+        BytesColumnVector tv = (BytesColumnVector) value.columns[id];
+        recordConsumer.addBinary(Binary.fromString(tv.toString(row).trim()));
+      }
     }
   }
 
-  private class TimestampDataWriter implements DataWriter {
-    private TimestampObjectInspector inspector;
-
-    public TimestampDataWriter(TimestampObjectInspector inspector) {
-      this.inspector = inspector;
+  private class TimestampDataWriter extends DataWriter {
+    public TimestampDataWriter(TypeDescription inspector) {
+      super(inspector);
     }
 
     @Override
-    public void write(Object value) {
-      Timestamp ts = inspector.getPrimitiveJavaObject(value);
-      recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, 
false).toBinary());
+    public void write(RowInBatch value, int row) {
+      if (value.columns[id].isRepeating) {
+        row = 0;
+      }
+      if (value.columns[id].noNulls || !value.columns[id].isNull[row]) {
+        TimestampColumnVector tv = (TimestampColumnVector) value.columns[id];
+        
recordConsumer.addBinary(NanoTimeUtils.getNanoTime(tv.asScratchTimestamp(row), 
false).toBinary());
+      }
     }
   }
 
-  private class DecimalDataWriter implements DataWriter {
-    private HiveDecimalObjectInspector inspector;
+  private class DecimalDataWriter extends DataWriter {
+    private final TypeDescription schema;
 
-    public DecimalDataWriter(HiveDecimalObjectInspector inspector) {
-      this.inspector = inspector;
+    public DecimalDataWriter(TypeDescription inspector) {
+      super(inspector);
+      schema = inspector;
     }
 
     @Override
-    public void write(Object value) {
-      HiveDecimal vDecimal = inspector.getPrimitiveJavaObject(value);
-      DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)inspector.getTypeInfo();
-      recordConsumer.addBinary(decimalToBinary(vDecimal, decTypeInfo));
+    public void write(RowInBatch value, int row) {
+      if (value.columns[id].isRepeating) {
+        row = 0;
+      }
+      if (value.columns[id].noNulls || !value.columns[id].isNull[row]) {
+        DecimalColumnVector tv = (DecimalColumnVector) value.columns[id];
+        
recordConsumer.addBinary(decimalToBinary(tv.vector[row].getHiveDecimal(), 
schema));
+      }
     }
 
-    private Binary decimalToBinary(final HiveDecimal hiveDecimal, final 
DecimalTypeInfo decimalTypeInfo) {
-      int prec = decimalTypeInfo.precision();
-      int scale = decimalTypeInfo.scale();
+    private Binary decimalToBinary(final HiveDecimal hiveDecimal,
+                                   final TypeDescription decimalTypeInfo) {
+      int prec = decimalTypeInfo.getPrecision();
+      int scale = decimalTypeInfo.getScale();
       byte[] decimalBytes = 
hiveDecimal.setScale(scale).unscaledValue().toByteArray();
 
       // Estimated number of bytes needed.
-      int precToBytes = ParquetHiveSerDe.PRECISION_TO_BYTE_COUNT[prec - 1];
+      int precToBytes = HiveSchemaConverter.PRECISION_TO_BYTE_COUNT[prec - 1];
       if (precToBytes == decimalBytes.length) {
         // No padding needed.
         return Binary.fromByteArray(decimalBytes);
@@ -529,22 +471,26 @@ public class DataWritableWriter {
         }
       }
 
-      System.arraycopy(decimalBytes, 0, tgt, precToBytes - 
decimalBytes.length, decimalBytes.length); // Padding leading zeroes/ones.
+      System.arraycopy(decimalBytes, 0, tgt, precToBytes - decimalBytes.length,
+          decimalBytes.length); // Padding leading zeroes/ones.
       return Binary.fromByteArray(tgt);
     }
   }
 
-  private class DateDataWriter implements DataWriter {
-    private DateObjectInspector inspector;
+  private class DateDataWriter extends DataWriter {
 
-    public DateDataWriter(DateObjectInspector inspector) {
-      this.inspector = inspector;
+    public DateDataWriter(TypeDescription inspector) {
+      super(inspector);
     }
 
     @Override
-    public void write(Object value) {
-      Date vDate = inspector.getPrimitiveJavaObject(value);
-      recordConsumer.addInteger(DateWritable.dateToDays(vDate));
+    public void write(RowInBatch value, int row) {
+      if (value.columns[id].isRepeating) {
+        row = 0;
+      }
+      if (value.columns[id].noNulls || !value.columns[id].isNull[row]) {
+        recordConsumer.addInteger((int) ((LongColumnVector) 
value.columns[id]).vector[row]);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/0f56aaad/java/bench/src/java/org/apache/orc/bench/parquet/ParquetScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/ParquetScan.java 
b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetScan.java
index 29ae438..d553934 100644
--- a/java/bench/src/java/org/apache/orc/bench/parquet/ParquetScan.java
+++ b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetScan.java
@@ -19,14 +19,11 @@
 package org.apache.orc.bench.parquet;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
-import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
 import org.apache.parquet.hadoop.ParquetInputFormat;
 
 public class ParquetScan {
@@ -41,8 +38,7 @@ public class ParquetScan {
       FileSplit split = new FileSplit(new Path(filename), 0, Long.MAX_VALUE,
           new String[]{});
       RecordReader<NullWritable,ArrayWritable> recordReader =
-          new ParquetRecordReaderWrapper(inputFormat, split, conf,
-              Reporter.NULL);
+          new ParquetRecordReaderWrapper(inputFormat, split, conf);
       ArrayWritable value = recordReader.createValue();
       while (recordReader.next(nada, value)) {
         rowCount += 1;

http://git-wip-us.apache.org/repos/asf/orc/blob/0f56aaad/java/bench/src/java/org/apache/orc/bench/parquet/RowInBatch.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/RowInBatch.java 
b/java/bench/src/java/org/apache/orc/bench/parquet/RowInBatch.java
index 60b4dfd..7bf1744 100644
--- a/java/bench/src/java/org/apache/orc/bench/parquet/RowInBatch.java
+++ b/java/bench/src/java/org/apache/orc/bench/parquet/RowInBatch.java
@@ -14,20 +14,79 @@
 
 package org.apache.orc.bench.parquet;
 
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.TypeDescription;
 
+import java.util.List;
+
+import static org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.Scope.col;
+
 /**
- * A value class that wraps a VectorizedRowBatch and a row index.
+ * A value class that wraps a VectorizedRowBatch so that we don't need to
+ * copy values into a separate object as we iterate through the rows.
  */
 public class RowInBatch {
   public final VectorizedRowBatch batch;
   public final TypeDescription schema;
-  public int row;
+  public final ColumnVector[] columns;
+
+  /**
+   * Build a mapping from the column id to the ColumnVector that stores
+   * its values.
+   * @param t The TypeDescription for this column
+   * @param v The ColumnVector for this column
+   */
+  private void fillInColumns(TypeDescription t, ColumnVector v) {
+    columns[t.getId()] = v;
+    switch (t.getCategory()) {
+      case LIST: {
+        ListColumnVector tv = (ListColumnVector) v;
+        fillInColumns(t.getChildren().get(0), tv.child);
+        break;
+      }
+      case MAP: {
+        MapColumnVector tv = (MapColumnVector) v;
+        fillInColumns(t.getChildren().get(0), tv.keys);
+        fillInColumns(t.getChildren().get(1), tv.values);
+        break;
+      }
+      case STRUCT: {
+        StructColumnVector tv= (StructColumnVector) v;
+        List<TypeDescription> children = t.getChildren();
+        for(int i=0; i < tv.fields.length; ++i) {
+          fillInColumns(children.get(i), tv.fields[i]);
+        }
+        break;
+      }
+      case UNION: {
+        UnionColumnVector tv= (UnionColumnVector) v;
+        List<TypeDescription> children = t.getChildren();
+        for(int i=0; i < tv.fields.length; ++i) {
+          fillInColumns(children.get(i), tv.fields[i]);
+        }
+        break;
+      }
+      default:
+        break;
+    }
+  }
 
   RowInBatch(TypeDescription schema) {
     this.schema = schema;
     batch = schema.createRowBatch();
-    row = 0;
+    columns = new ColumnVector[schema.getMaximumId() + 1];
+    if (schema.getCategory() == TypeDescription.Category.STRUCT) {
+      List<TypeDescription> children = schema.getChildren();
+      for(int i=0; i < children.size(); ++i) {
+        fillInColumns(children.get(i), batch.cols[i]);
+      }
+    } else {
+      fillInColumns(schema, batch.cols[0]);
+    }
   }
 }

Reply via email to