It compiles
Project: http://git-wip-us.apache.org/repos/asf/orc/repo Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/611388b5 Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/611388b5 Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/611388b5 Branch: refs/heads/orc-72 Commit: 611388b50f8f3e9deeaa0e5335f0afd7f0ab9422 Parents: 0f56aaa Author: Owen O'Malley <omal...@apache.org> Authored: Wed Oct 12 16:06:29 2016 -0700 Committer: Owen O'Malley <omal...@apache.org> Committed: Wed Oct 12 16:06:29 2016 -0700 ---------------------------------------------------------------------- .../org/apache/orc/bench/CompressionKind.java | 17 ++- .../org/apache/orc/bench/FullReadBenchmark.java | 20 +-- .../org/apache/orc/bench/GithubToParquet.java | 34 ++--- .../org/apache/orc/bench/SalesToParquet.java | 34 +---- .../org/apache/orc/bench/TaxiToParquet.java | 49 ++----- .../java/org/apache/orc/bench/csv/CsvScan.java | 4 +- .../bench/parquet/DataWritableReadSupport.java | 2 - .../bench/parquet/HiveCollectionConverter.java | 2 - .../parquet/MapredParquetOutputFormat.java | 129 ------------------- .../parquet/ParquetRecordReaderWrapper.java | 2 - .../apache/orc/bench/parquet/ParquetWriter.java | 72 +++++++++++ 11 files changed, 118 insertions(+), 247 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/CompressionKind.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/CompressionKind.java b/java/bench/src/java/org/apache/orc/bench/CompressionKind.java index 9fe9ba9..86ac476 100644 --- a/java/bench/src/java/org/apache/orc/bench/CompressionKind.java +++ b/java/bench/src/java/org/apache/orc/bench/CompressionKind.java @@ -20,11 +20,13 @@ package org.apache.orc.bench; import io.airlift.compress.snappy.SnappyCodec; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; +import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; /** - * Created by owen on 10/5/16. + * Enum for handling the compression codecs for the benchmark */ public enum CompressionKind { NONE(""), @@ -53,4 +55,17 @@ public enum CompressionKind { throw new IllegalArgumentException("Unhandled kind " + this); } } + + public InputStream read(InputStream in) throws IOException { + switch (this) { + case NONE: + return in; + case ZLIB: + return new GZIPInputStream(in); + case SNAPPY: + return new SnappyCodec().createInputStream(in); + default: + throw new IllegalArgumentException("Unhandled kind " + this); + } + } } http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java b/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java index 917707d..849e030 100644 --- a/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java +++ b/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java @@ -29,19 +29,17 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.TrackingLocalFileSystem; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; -import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; import org.apache.orc.OrcFile; import org.apache.orc.Reader; import org.apache.orc.RecordReader; import org.apache.orc.TypeDescription; +import org.apache.orc.bench.parquet.DataWritableReadSupport; +import org.apache.orc.bench.parquet.ParquetRecordReaderWrapper; import org.apache.parquet.hadoop.ParquetInputFormat; -import io.airlift.compress.snappy.HadoopSnappyInputStream; import org.openjdk.jmh.annotations.AuxCounters; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -63,7 +61,6 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.net.URI; import java.util.concurrent.TimeUnit; -import java.util.zip.GZIPInputStream; @BenchmarkMode(Mode.AverageTime) @Warmup(iterations=1, time=10, timeUnit = TimeUnit.SECONDS) @@ -173,8 +170,7 @@ public class FullReadBenchmark { NullWritable nada = NullWritable.get(); FileSplit split = new FileSplit(path, 0, Long.MAX_VALUE, new String[]{}); org.apache.hadoop.mapred.RecordReader<NullWritable,ArrayWritable> recordReader = - new ParquetRecordReaderWrapper(inputFormat, split, conf, - Reporter.NULL); + new ParquetRecordReaderWrapper(inputFormat, split, conf); ArrayWritable value = recordReader.createValue(); while (recordReader.next(nada, value)) { counters.records += 1; @@ -193,14 +189,8 @@ public class FullReadBenchmark { FileSystem.Statistics statistics = fs.getLocalStatistics(); statistics.reset(); Path path = new Path("generated/" + Dataset + "-" + compression + ".json"); - InputStream input = fs.open(path); - if ("zlib".equals(compression)) { - input = new GZIPInputStream(input); - } else if ("snappy".equals(compression)) { - input = new HadoopSnappyInputStream(input); - } else if (!"none".equals(compression)) { - throw new IllegalArgumentException("Unknown compression " + compression); - } + CompressionKind compress = CompressionKind.valueOf(compression); + InputStream input = compress.read(fs.open(path)); JsonStreamParser parser = new JsonStreamParser(new InputStreamReader(input)); while (parser.hasNext()) { http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java b/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java index b1678aa..5974883 100644 --- a/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java +++ b/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java @@ -19,17 +19,11 @@ package org.apache.orc.bench; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.io.orc.VectorToWritable; -import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; -import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; import org.apache.orc.TypeDescription; -import org.apache.orc.bench.avro.AvroWriter; import org.apache.orc.bench.json.JsonReader; +import org.apache.orc.bench.parquet.ParquetWriter; import java.util.Properties; @@ -37,29 +31,17 @@ public class GithubToParquet { public static void main(String[] args) throws Exception { TypeDescription schema = Utilities.loadSchema("github.schema"); - VectorizedRowBatch batch = schema.createRowBatch(); JobConf conf = new JobConf(); - conf.set("mapred.task.id", "attempt_0_0_m_0_0"); - conf.set("parquet.compression", TaxiToParquet.getCodec(args[1])); Path path = new Path(args[0]); - Properties properties = AvroWriter.setHiveSchema(schema); - MapredParquetOutputFormat format = new MapredParquetOutputFormat(); - FileSinkOperator.RecordWriter writer = format.getHiveRecordWriter(conf, - path, ParquetHiveRecord.class, !"none".equals(args[1]), properties, - Reporter.NULL); - ParquetHiveRecord record = new ParquetHiveRecord(); - record.inspector = - (StructObjectInspector) VectorToWritable.createObjectInspector(schema); - for(String inFile: TaxiToOrc.sliceArray(args, 2)) { - JsonReader reader = new JsonReader(new Path(inFile), conf, schema); - while (reader.nextBatch(batch)) { - for(int r=0; r < batch.size; ++r) { - record.value = VectorToWritable.createValue(batch, r, schema, - record.value); - writer.write(record); + try (ParquetWriter writer = new ParquetWriter(path, schema, conf, + CompressionKind.valueOf(args[1]))) { + VectorizedRowBatch batch = writer.getBatch(); + for (String inFile : Utilities.sliceArray(args, 2)) { + JsonReader reader = new JsonReader(new Path(inFile), conf, schema); + while (reader.nextBatch(batch)) { + writer.writeBatch(); } } } - writer.close(true); } } http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java b/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java index 3da900f..5768d0b 100644 --- a/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java +++ b/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java @@ -19,44 +19,24 @@ package org.apache.orc.bench; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.io.orc.VectorToWritable; -import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; -import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; import org.apache.orc.TypeDescription; -import org.apache.orc.bench.avro.AvroWriter; - -import java.util.Properties; +import org.apache.orc.bench.parquet.ParquetWriter; public class SalesToParquet { public static void main(String[] args) throws Exception { JobConf conf = new JobConf(); - conf.set("mapred.task.id", "attempt_0_0_m_0_0"); - conf.set("parquet.compression", TaxiToParquet.getCodec(args[1])); SalesGenerator sales = new SalesGenerator(Long.parseLong(args[2])); - TypeDescription schema = sales.getSchema(); - VectorizedRowBatch batch = schema.createRowBatch(); Path path = new Path(args[0]); - Properties properties = AvroWriter.setHiveSchema(schema); - MapredParquetOutputFormat format = new MapredParquetOutputFormat(); - FileSinkOperator.RecordWriter writer = format.getHiveRecordWriter(conf, - path, ParquetHiveRecord.class, !"none".equals(args[1]), properties, - Reporter.NULL); - ParquetHiveRecord record = new ParquetHiveRecord(); - record.inspector = - (StructObjectInspector) VectorToWritable.createObjectInspector(schema); - while (sales.nextBatch(batch)) { - for(int r=0; r < batch.size; ++r) { - record.value = VectorToWritable.createValue(batch, r, schema, - record.value); - writer.write(record); + TypeDescription schema = sales.getSchema(); + try (ParquetWriter writer = new ParquetWriter(path, schema, conf, + CompressionKind.valueOf(args[1]))) { + VectorizedRowBatch batch = writer.getBatch(); + while (sales.nextBatch(batch)) { + writer.writeBatch(); } } - writer.close(true); } } http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java b/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java index 3eafc87..9703d46 100644 --- a/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java +++ b/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java @@ -19,59 +19,26 @@ package org.apache.orc.bench; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.io.orc.VectorToWritable; -import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; -import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; import org.apache.orc.TypeDescription; -import org.apache.orc.bench.avro.AvroWriter; import org.apache.orc.bench.csv.CsvReader; +import org.apache.orc.bench.parquet.ParquetWriter; -import java.util.Properties; public class TaxiToParquet { - public static String getCodec(String name) { - if ("none".equals(name)) { - return "UNCOMPRESSED"; - } else if ("zlib".equals(name)) { - return "GZIP"; - } else if ("snappy".equals(name)) { - return "SNAPPY"; - } else { - throw new IllegalArgumentException("Unknown compression " + name); - } - } - public static void main(String[] args) throws Exception { - TypeDescription schema = TaxiToOrc.loadSchema("nyc-taxi.schema"); - VectorizedRowBatch batch = schema.createRowBatch(); + TypeDescription schema = Utilities.loadSchema("nyc-taxi.schema"); JobConf conf = new JobConf(); - conf.set("mapred.task.id", "attempt_0_0_m_0_0"); - conf.set("parquet.compression", getCodec(args[1])); - boolean isCompressed = true; Path path = new Path(args[0]); - Properties properties = AvroWriter.setHiveSchema(schema); - MapredParquetOutputFormat format = new MapredParquetOutputFormat(); - FileSinkOperator.RecordWriter writer = format.getHiveRecordWriter(conf, - path, ParquetHiveRecord.class, !"none".equals(args[1]), properties, - Reporter.NULL); - ParquetHiveRecord record = new ParquetHiveRecord(); - record.inspector = - (StructObjectInspector) VectorToWritable.createObjectInspector(schema); - for(String inFile: TaxiToOrc.sliceArray(args, 2)) { - CsvReader reader = new CsvReader(new Path(inFile), conf, schema); - while (reader.nextBatch(batch)) { - for(int r=0; r < batch.size; ++r) { - record.value = VectorToWritable.createValue(batch, r, schema, - record.value); - writer.write(record); + try (ParquetWriter writer = new ParquetWriter(path, schema, conf, CompressionKind.valueOf(args[1]))) { + VectorizedRowBatch batch = writer.getBatch(); + for(String inFile: Utilities.sliceArray(args, 2)) { + CsvReader reader = new CsvReader(new Path(inFile), conf, schema); + while (reader.nextBatch(batch)) { + writer.writeBatch(); } } } - writer.close(true); } } http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/csv/CsvScan.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/csv/CsvScan.java b/java/bench/src/java/org/apache/orc/bench/csv/CsvScan.java index ae78cc4..d1b372a 100644 --- a/java/bench/src/java/org/apache/orc/bench/csv/CsvScan.java +++ b/java/bench/src/java/org/apache/orc/bench/csv/CsvScan.java @@ -22,13 +22,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.TypeDescription; -import org.apache.orc.bench.TaxiToOrc; +import org.apache.orc.bench.Utilities; public class CsvScan { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); long rowCount = 0; - TypeDescription schema = TaxiToOrc.loadSchema("nyc-taxi.schema"); + TypeDescription schema = Utilities.loadSchema("nyc-taxi.schema"); for(String filename: args) { CsvReader reader = new CsvReader(new Path(filename), conf, schema); VectorizedRowBatch batch = schema.createRowBatch(); http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableReadSupport.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableReadSupport.java b/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableReadSupport.java index 0bcce1f..37f2d75 100644 --- a/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableReadSupport.java +++ b/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableReadSupport.java @@ -14,10 +14,8 @@ package org.apache.orc.bench.parquet; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; -import java.util.ListIterator; import java.util.Map; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/parquet/HiveCollectionConverter.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/HiveCollectionConverter.java b/java/bench/src/java/org/apache/orc/bench/parquet/HiveCollectionConverter.java index a8834aa..0623917 100644 --- a/java/bench/src/java/org/apache/orc/bench/parquet/HiveCollectionConverter.java +++ b/java/bench/src/java/org/apache/orc/bench/parquet/HiveCollectionConverter.java @@ -30,7 +30,6 @@ import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.Type; public class HiveCollectionConverter extends HiveGroupConverter { - private final GroupType collectionType; private final ConverterParent parent; private final int index; private final Converter innerConverter; @@ -54,7 +53,6 @@ public class HiveCollectionConverter extends HiveGroupConverter { ConverterParent parent, int index, boolean isMap, TypeDescription hiveTypeInfo) { setMetadata(parent.getMetadata()); - this.collectionType = collectionType; this.parent = parent; this.index = index; Type repeatedType = collectionType.getType(0); http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/parquet/MapredParquetOutputFormat.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/MapredParquetOutputFormat.java b/java/bench/src/java/org/apache/orc/bench/parquet/MapredParquetOutputFormat.java deleted file mode 100644 index bfb48a9..0000000 --- a/java/bench/src/java/org/apache/orc/bench/parquet/MapredParquetOutputFormat.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.io.parquet; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.HiveOutputFormat; -import org.apache.hadoop.hive.ql.io.IOConstants; -import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter; -import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport; -import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper; -import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.util.Progressable; - -import org.apache.parquet.hadoop.ParquetOutputFormat; - -/** - * - * A Parquet OutputFormat for Hive (with the deprecated package mapred) - * - */ -public class MapredParquetOutputFormat extends FileOutputFormat<NullWritable, ParquetHiveRecord> - implements HiveOutputFormat<NullWritable, ParquetHiveRecord> { - - private static final Logger LOG = LoggerFactory.getLogger(MapredParquetOutputFormat.class); - - protected ParquetOutputFormat<ParquetHiveRecord> realOutputFormat; - - public MapredParquetOutputFormat() { - realOutputFormat = new ParquetOutputFormat<ParquetHiveRecord>(new DataWritableWriteSupport()); - } - - public MapredParquetOutputFormat(final OutputFormat<Void, ParquetHiveRecord> mapreduceOutputFormat) { - realOutputFormat = (ParquetOutputFormat<ParquetHiveRecord>) mapreduceOutputFormat; - } - - @Override - public void checkOutputSpecs(final FileSystem ignored, final JobConf job) throws IOException { - realOutputFormat.checkOutputSpecs(ShimLoader.getHadoopShims().getHCatShim().createJobContext(job, null)); - } - - @Override - public RecordWriter<NullWritable, ParquetHiveRecord> getRecordWriter( - final FileSystem ignored, - final JobConf job, - final String name, - final Progressable progress - ) throws IOException { - throw new RuntimeException("Should never be used"); - } - - /** - * - * Create the parquet schema from the hive schema, and return the RecordWriterWrapper which - * contains the real output format - */ - @Override - public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter( - final JobConf jobConf, - final Path finalOutPath, - final Class<? extends Writable> valueClass, - final boolean isCompressed, - final Properties tableProperties, - final Progressable progress) throws IOException { - - LOG.info("creating new record writer..." + this); - - final String columnNameProperty = tableProperties.getProperty(IOConstants.COLUMNS); - final String columnTypeProperty = tableProperties.getProperty(IOConstants.COLUMNS_TYPES); - List<String> columnNames; - List<TypeInfo> columnTypes; - - if (columnNameProperty.length() == 0) { - columnNames = new ArrayList<String>(); - } else { - columnNames = Arrays.asList(columnNameProperty.split(",")); - } - - if (columnTypeProperty.length() == 0) { - columnTypes = new ArrayList<TypeInfo>(); - } else { - columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); - } - - DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), jobConf); - - return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), - progress,tableProperties); - } - - protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper( - ParquetOutputFormat<ParquetHiveRecord> realOutputFormat, - JobConf jobConf, - String finalOutPath, - Progressable progress, - Properties tableProperties - ) throws IOException { - return new ParquetRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), - progress,tableProperties); - } -} http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/parquet/ParquetRecordReaderWrapper.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/ParquetRecordReaderWrapper.java b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetRecordReaderWrapper.java index fdbc689..5eb3095 100644 --- a/java/bench/src/java/org/apache/orc/bench/parquet/ParquetRecordReaderWrapper.java +++ b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetRecordReaderWrapper.java @@ -47,8 +47,6 @@ import org.apache.parquet.hadoop.util.ContextUtil; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; -import com.google.common.base.Strings; - public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, ArrayWritable> { public static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReaderWrapper.class); http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/parquet/ParquetWriter.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/ParquetWriter.java b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetWriter.java new file mode 100644 index 0000000..4ba67c9 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetWriter.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.CompressionKind; +import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +import java.io.IOException; + +public class ParquetWriter implements AutoCloseable { + private final RecordWriter<Void, RowInBatch> writer; + private final TypeDescription schema; + private final RowInBatch batch; + + public ParquetWriter(Path path, + TypeDescription schema, + Configuration conf, + CompressionKind compression + ) throws IOException, InterruptedException { + this.schema = schema; + this.batch = new RowInBatch(schema); + writer = new ParquetOutputFormat<RowInBatch>(new DataWritableWriteSupport()) + .getRecordWriter(conf, path, getCodec(compression)); + } + + public VectorizedRowBatch getBatch() { + return batch.batch; + } + + public void writeBatch() throws IOException, InterruptedException { + writer.write(null, batch); + } + + public void close() throws IOException, InterruptedException { + writer.close(null); + } + + public static CompressionCodecName getCodec(CompressionKind kind) { + switch (kind) { + case NONE: + return CompressionCodecName.UNCOMPRESSED; + case ZLIB: + return CompressionCodecName.GZIP; + case SNAPPY: + return CompressionCodecName.SNAPPY; + default: + throw new IllegalArgumentException("Unsupported codec " + kind); + } + } +}