Repository: hive Updated Branches: refs/heads/master 2134bfc3c -> 0a4446e64
HIVE-14827: Micro benchmark for Parquet vectorized reader (Colin Ma, reviewed by Ferdinand Xu) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0a4446e6 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0a4446e6 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0a4446e6 Branch: refs/heads/master Commit: 0a4446e641748c33e8da6e5e54c69b3e25964464 Parents: 2134bfc Author: Ferdinand Xu <cheng.a...@intel.com> Authored: Tue Feb 7 02:11:01 2017 +0800 Committer: Ferdinand Xu <cheng.a...@intel.com> Committed: Tue Feb 7 02:18:05 2017 +0800 ---------------------------------------------------------------------- .../benchmark/storage/ColumnarStorageBench.java | 76 +++++++++++++++++--- 1 file changed, 68 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0a4446e6/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java ---------------------------------------------------------------------- diff --git a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java index 3efe424..a14b790 100644 --- a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java +++ b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java @@ -17,21 +17,31 @@ package org.apache.hive.benchmark.storage; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcSerde; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; @@ -49,6 +59,11 @@ import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.example.GroupReadSupport; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.TearDown; @@ -296,10 +311,36 @@ public class ColumnarStorageBench { return outputFormat.getHiveRecordWriter(jobConf, outputPath, null, false, recordProperties, null); } - public RecordReader getRecordReader(Path inputPath) throws IOException { - return inputFormat.getRecordReader( - new FileSplit(inputPath, 0, fileLength(inputPath), (String[]) null), - jobConf, null); + public RecordReader getRecordReader(Path inputPath) throws Exception { + if ("parquet".equals(format) || "orc".equals(format)) { + return inputFormat.getRecordReader( + new FileSplit(inputPath, 0, fileLength(inputPath), (String[]) null), + jobConf, null); + } else if ("parquet-vec".equals(format)) { + return getVectorizedRecordReader(inputPath); + } else { + throw new IllegalArgumentException("Invalid file format argument: " + format); + } + } + + public RecordReader getVectorizedRecordReader(Path inputPath) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, getColumnNames(DEFAULT_COLUMN_TYPES)); + conf.set(IOConstants.COLUMNS_TYPES, DEFAULT_COLUMN_TYPES); + // TODO: VectorizedParquetRecordReader doesn't support map, array now, the value of + // ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR should be updated after support these data + // types. + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0,1,2,3,6"); + conf.set(ReadSupport.PARQUET_READ_SCHEMA, "test schema"); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp"); + Job vectorJob = new Job(conf, "read vector"); + ParquetInputFormat.setInputPaths(vectorJob, inputPath); + ParquetInputFormat parquetInputFormat = new ParquetInputFormat(GroupReadSupport.class); + InputSplit split = (InputSplit) parquetInputFormat.getSplits(vectorJob).get(0); + initialVectorizedRowBatchCtx(conf); + return new VectorizedParquetRecordReader(split, new JobConf(conf)); } } @@ -334,7 +375,7 @@ public class ColumnarStorageBench { } // Test different format types - @Param({"orc", "parquet"}) + @Param({"orc", "parquet", "parquet-vec"}) public String format; /** @@ -345,7 +386,7 @@ public class ColumnarStorageBench { */ @Setup(Level.Trial) public void prepareBenchmark() throws SerDeException, IOException { - if (format.equalsIgnoreCase("parquet")) { + if (format.equalsIgnoreCase("parquet") || format.equalsIgnoreCase("parquet-vec")) { storageFormatTest = new ParquetStorageFormatTest(); } else if (format.equalsIgnoreCase("orc")) { storageFormatTest = new OrcStorageFormatTest(); @@ -373,6 +414,25 @@ public class ColumnarStorageBench { writer.close(false); } + private void initialVectorizedRowBatchCtx(Configuration conf) throws HiveException { + MapWork mapWork = new MapWork(); + VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx(); + rbCtx.init(createStructObjectInspector(conf), new String[0]); + mapWork.setVectorMode(true); + mapWork.setVectorizedRowBatchCtx(rbCtx); + Utilities.setMapWork(conf, mapWork); + } + + private StructObjectInspector createStructObjectInspector(Configuration conf) { + // Create row related objects + String columnNames = conf.get(IOConstants.COLUMNS); + List<String> columnNamesList = DataWritableReadSupport.getColumnNames(columnNames); + String columnTypes = conf.get(IOConstants.COLUMNS_TYPES); + List<TypeInfo> columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes); + TypeInfo rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList); + return new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo); + } + /** * It deletes any temporary file created by prepareBenchmark. */ @@ -388,7 +448,7 @@ public class ColumnarStorageBench { * @throws IOException If it cannot writes temporary files. */ @Setup(Level.Invocation) - public void prepareInvocation() throws IOException { + public void prepareInvocation() throws Exception { recordWriterFile = createTempFile(); recordWriterPath = new Path(recordWriterFile.getPath()); @@ -437,7 +497,7 @@ public class ColumnarStorageBench { } @Benchmark - public RecordReader getRecordReader() throws IOException { + public RecordReader getRecordReader() throws Exception { return storageFormatTest.getRecordReader(readPath); }