Repository: hive
Updated Branches:
  refs/heads/master 2134bfc3c -> 0a4446e64


HIVE-14827: Micro benchmark for Parquet vectorized reader (Colin Ma, reviewed 
by Ferdinand Xu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0a4446e6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0a4446e6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0a4446e6

Branch: refs/heads/master
Commit: 0a4446e641748c33e8da6e5e54c69b3e25964464
Parents: 2134bfc
Author: Ferdinand Xu <cheng.a...@intel.com>
Authored: Tue Feb 7 02:11:01 2017 +0800
Committer: Ferdinand Xu <cheng.a...@intel.com>
Committed: Tue Feb 7 02:18:05 2017 +0800

----------------------------------------------------------------------
 .../benchmark/storage/ColumnarStorageBench.java | 76 +++++++++++++++++---
 1 file changed, 68 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0a4446e6/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java
 
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java
index 3efe424..a14b790 100644
--- 
a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java
+++ 
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java
@@ -17,21 +17,31 @@ package org.apache.hive.benchmark.storage;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import 
org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -49,6 +59,11 @@ import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
 import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.TearDown;
@@ -296,10 +311,36 @@ public class ColumnarStorageBench {
       return outputFormat.getHiveRecordWriter(jobConf, outputPath, null, 
false, recordProperties, null);
     }
 
-    public RecordReader getRecordReader(Path inputPath) throws IOException {
-      return inputFormat.getRecordReader(
-          new FileSplit(inputPath, 0, fileLength(inputPath), (String[]) null),
-          jobConf, null);
+    public RecordReader getRecordReader(Path inputPath) throws Exception {
+      if ("parquet".equals(format) || "orc".equals(format)) {
+        return inputFormat.getRecordReader(
+            new FileSplit(inputPath, 0, fileLength(inputPath), (String[]) 
null),
+            jobConf, null);
+      } else if ("parquet-vec".equals(format)) {
+        return getVectorizedRecordReader(inputPath);
+      } else {
+        throw new IllegalArgumentException("Invalid file format argument: " + 
format);
+      }
+    }
+
+    public RecordReader getVectorizedRecordReader(Path inputPath) throws 
Exception {
+      Configuration conf = new Configuration();
+      conf.set(IOConstants.COLUMNS, getColumnNames(DEFAULT_COLUMN_TYPES));
+      conf.set(IOConstants.COLUMNS_TYPES, DEFAULT_COLUMN_TYPES);
+      // TODO: VectorizedParquetRecordReader doesn't support map, array now, 
the value of
+      // ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR should be updated 
after support these data
+      // types.
+      conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+      conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0,1,2,3,6");
+      conf.set(ReadSupport.PARQUET_READ_SCHEMA, "test schema");
+      HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, 
true);
+      HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp");
+      Job vectorJob = new Job(conf, "read vector");
+      ParquetInputFormat.setInputPaths(vectorJob, inputPath);
+      ParquetInputFormat parquetInputFormat = new 
ParquetInputFormat(GroupReadSupport.class);
+      InputSplit split = (InputSplit) 
parquetInputFormat.getSplits(vectorJob).get(0);
+      initialVectorizedRowBatchCtx(conf);
+      return new VectorizedParquetRecordReader(split, new JobConf(conf));
     }
   }
 
@@ -334,7 +375,7 @@ public class ColumnarStorageBench {
   }
 
   // Test different format types
-  @Param({"orc", "parquet"})
+  @Param({"orc", "parquet", "parquet-vec"})
   public String format;
 
   /**
@@ -345,7 +386,7 @@ public class ColumnarStorageBench {
    */
   @Setup(Level.Trial)
   public void prepareBenchmark() throws SerDeException, IOException {
-    if (format.equalsIgnoreCase("parquet")) {
+    if (format.equalsIgnoreCase("parquet") || 
format.equalsIgnoreCase("parquet-vec")) {
       storageFormatTest = new ParquetStorageFormatTest();
     } else if (format.equalsIgnoreCase("orc")) {
       storageFormatTest = new OrcStorageFormatTest();
@@ -373,6 +414,25 @@ public class ColumnarStorageBench {
     writer.close(false);
   }
 
+  private void initialVectorizedRowBatchCtx(Configuration conf) throws 
HiveException {
+    MapWork mapWork = new MapWork();
+    VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx();
+    rbCtx.init(createStructObjectInspector(conf), new String[0]);
+    mapWork.setVectorMode(true);
+    mapWork.setVectorizedRowBatchCtx(rbCtx);
+    Utilities.setMapWork(conf, mapWork);
+  }
+
+  private StructObjectInspector createStructObjectInspector(Configuration 
conf) {
+    // Create row related objects
+    String columnNames = conf.get(IOConstants.COLUMNS);
+    List<String> columnNamesList = 
DataWritableReadSupport.getColumnNames(columnNames);
+    String columnTypes = conf.get(IOConstants.COLUMNS_TYPES);
+    List<TypeInfo> columnTypesList = 
DataWritableReadSupport.getColumnTypes(columnTypes);
+    TypeInfo rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, 
columnTypesList);
+    return new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo);
+  }
+
   /**
    * It deletes any temporary file created by prepareBenchmark.
    */
@@ -388,7 +448,7 @@ public class ColumnarStorageBench {
    * @throws IOException If it cannot writes temporary files.
    */
   @Setup(Level.Invocation)
-  public void prepareInvocation() throws IOException {
+  public void prepareInvocation() throws Exception {
     recordWriterFile = createTempFile();
     recordWriterPath = new Path(recordWriterFile.getPath());
 
@@ -437,7 +497,7 @@ public class ColumnarStorageBench {
   }
 
   @Benchmark
-  public RecordReader getRecordReader() throws IOException {
+  public RecordReader getRecordReader() throws Exception {
     return storageFormatTest.getRecordReader(readPath);
   }
 

Reply via email to