It compiles

Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/611388b5
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/611388b5
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/611388b5

Branch: refs/heads/orc-72
Commit: 611388b50f8f3e9deeaa0e5335f0afd7f0ab9422
Parents: 0f56aaa
Author: Owen O'Malley <omal...@apache.org>
Authored: Wed Oct 12 16:06:29 2016 -0700
Committer: Owen O'Malley <omal...@apache.org>
Committed: Wed Oct 12 16:06:29 2016 -0700

----------------------------------------------------------------------
 .../org/apache/orc/bench/CompressionKind.java   |  17 ++-
 .../org/apache/orc/bench/FullReadBenchmark.java |  20 +--
 .../org/apache/orc/bench/GithubToParquet.java   |  34 ++---
 .../org/apache/orc/bench/SalesToParquet.java    |  34 +----
 .../org/apache/orc/bench/TaxiToParquet.java     |  49 ++-----
 .../java/org/apache/orc/bench/csv/CsvScan.java  |   4 +-
 .../bench/parquet/DataWritableReadSupport.java  |   2 -
 .../bench/parquet/HiveCollectionConverter.java  |   2 -
 .../parquet/MapredParquetOutputFormat.java      | 129 -------------------
 .../parquet/ParquetRecordReaderWrapper.java     |   2 -
 .../apache/orc/bench/parquet/ParquetWriter.java |  72 +++++++++++
 11 files changed, 118 insertions(+), 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/CompressionKind.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/CompressionKind.java 
b/java/bench/src/java/org/apache/orc/bench/CompressionKind.java
index 9fe9ba9..86ac476 100644
--- a/java/bench/src/java/org/apache/orc/bench/CompressionKind.java
+++ b/java/bench/src/java/org/apache/orc/bench/CompressionKind.java
@@ -20,11 +20,13 @@ package org.apache.orc.bench;
 
 import io.airlift.compress.snappy.SnappyCodec;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
 /**
- * Created by owen on 10/5/16.
+ * Enum for handling the compression codecs for the benchmark
  */
 public enum CompressionKind {
   NONE(""),
@@ -53,4 +55,17 @@ public enum CompressionKind {
         throw new IllegalArgumentException("Unhandled kind " + this);
     }
   }
+
+  public InputStream read(InputStream in) throws IOException {
+    switch (this) {
+      case NONE:
+        return in;
+      case ZLIB:
+        return new GZIPInputStream(in);
+      case SNAPPY:
+        return new SnappyCodec().createInputStream(in);
+      default:
+        throw new IllegalArgumentException("Unhandled kind " + this);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java 
b/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java
index 917707d..849e030 100644
--- a/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java
+++ b/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java
@@ -29,19 +29,17 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.TrackingLocalFileSystem;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
-import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
 import org.apache.orc.OrcFile;
 import org.apache.orc.Reader;
 import org.apache.orc.RecordReader;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.parquet.DataWritableReadSupport;
+import org.apache.orc.bench.parquet.ParquetRecordReaderWrapper;
 import org.apache.parquet.hadoop.ParquetInputFormat;
-import io.airlift.compress.snappy.HadoopSnappyInputStream;
 import org.openjdk.jmh.annotations.AuxCounters;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -63,7 +61,6 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.URI;
 import java.util.concurrent.TimeUnit;
-import java.util.zip.GZIPInputStream;
 
 @BenchmarkMode(Mode.AverageTime)
 @Warmup(iterations=1, time=10, timeUnit = TimeUnit.SECONDS)
@@ -173,8 +170,7 @@ public class FullReadBenchmark {
     NullWritable nada = NullWritable.get();
     FileSplit split = new FileSplit(path, 0, Long.MAX_VALUE, new String[]{});
     org.apache.hadoop.mapred.RecordReader<NullWritable,ArrayWritable> 
recordReader =
-        new ParquetRecordReaderWrapper(inputFormat, split, conf,
-            Reporter.NULL);
+        new ParquetRecordReaderWrapper(inputFormat, split, conf);
     ArrayWritable value = recordReader.createValue();
     while (recordReader.next(nada, value)) {
       counters.records += 1;
@@ -193,14 +189,8 @@ public class FullReadBenchmark {
     FileSystem.Statistics statistics = fs.getLocalStatistics();
     statistics.reset();
     Path path = new Path("generated/" + Dataset + "-" + compression + ".json");
-    InputStream input = fs.open(path);
-    if ("zlib".equals(compression)) {
-      input = new GZIPInputStream(input);
-    } else if ("snappy".equals(compression)) {
-      input = new HadoopSnappyInputStream(input);
-    } else if (!"none".equals(compression)) {
-      throw new IllegalArgumentException("Unknown compression " + compression);
-    }
+    CompressionKind compress = CompressionKind.valueOf(compression);
+    InputStream input = compress.read(fs.open(path));
     JsonStreamParser parser =
         new JsonStreamParser(new InputStreamReader(input));
     while (parser.hasNext()) {

http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java 
b/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java
index b1678aa..5974883 100644
--- a/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java
+++ b/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java
@@ -19,17 +19,11 @@
 package org.apache.orc.bench;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.orc.VectorToWritable;
-import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
-import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.bench.avro.AvroWriter;
 import org.apache.orc.bench.json.JsonReader;
+import org.apache.orc.bench.parquet.ParquetWriter;
 
 import java.util.Properties;
 
@@ -37,29 +31,17 @@ public class GithubToParquet {
 
   public static void main(String[] args) throws Exception {
     TypeDescription schema = Utilities.loadSchema("github.schema");
-    VectorizedRowBatch batch = schema.createRowBatch();
     JobConf conf = new JobConf();
-    conf.set("mapred.task.id", "attempt_0_0_m_0_0");
-    conf.set("parquet.compression", TaxiToParquet.getCodec(args[1]));
     Path path = new Path(args[0]);
-    Properties properties = AvroWriter.setHiveSchema(schema);
-    MapredParquetOutputFormat format = new MapredParquetOutputFormat();
-    FileSinkOperator.RecordWriter writer = format.getHiveRecordWriter(conf,
-        path, ParquetHiveRecord.class, !"none".equals(args[1]), properties,
-        Reporter.NULL);
-    ParquetHiveRecord record = new ParquetHiveRecord();
-    record.inspector =
-        (StructObjectInspector) VectorToWritable.createObjectInspector(schema);
-    for(String inFile: TaxiToOrc.sliceArray(args, 2)) {
-      JsonReader reader = new JsonReader(new Path(inFile), conf, schema);
-      while (reader.nextBatch(batch)) {
-        for(int r=0; r < batch.size; ++r) {
-          record.value = VectorToWritable.createValue(batch, r, schema,
-              record.value);
-          writer.write(record);
+    try (ParquetWriter writer = new ParquetWriter(path, schema, conf,
+        CompressionKind.valueOf(args[1]))) {
+      VectorizedRowBatch batch = writer.getBatch();
+      for (String inFile : Utilities.sliceArray(args, 2)) {
+        JsonReader reader = new JsonReader(new Path(inFile), conf, schema);
+        while (reader.nextBatch(batch)) {
+          writer.writeBatch();
         }
       }
     }
-    writer.close(true);
   }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java 
b/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java
index 3da900f..5768d0b 100644
--- a/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java
+++ b/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java
@@ -19,44 +19,24 @@
 package org.apache.orc.bench;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.orc.VectorToWritable;
-import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
-import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.bench.avro.AvroWriter;
-
-import java.util.Properties;
+import org.apache.orc.bench.parquet.ParquetWriter;
 
 public class SalesToParquet {
 
   public static void main(String[] args) throws Exception {
     JobConf conf = new JobConf();
-    conf.set("mapred.task.id", "attempt_0_0_m_0_0");
-    conf.set("parquet.compression", TaxiToParquet.getCodec(args[1]));
     SalesGenerator sales = new SalesGenerator(Long.parseLong(args[2]));
-    TypeDescription schema = sales.getSchema();
-    VectorizedRowBatch batch = schema.createRowBatch();
     Path path = new Path(args[0]);
-    Properties properties = AvroWriter.setHiveSchema(schema);
-    MapredParquetOutputFormat format = new MapredParquetOutputFormat();
-    FileSinkOperator.RecordWriter writer = format.getHiveRecordWriter(conf,
-        path, ParquetHiveRecord.class, !"none".equals(args[1]), properties,
-        Reporter.NULL);
-    ParquetHiveRecord record = new ParquetHiveRecord();
-    record.inspector =
-        (StructObjectInspector) VectorToWritable.createObjectInspector(schema);
-    while (sales.nextBatch(batch)) {
-      for(int r=0; r < batch.size; ++r) {
-        record.value = VectorToWritable.createValue(batch, r, schema,
-            record.value);
-        writer.write(record);
+    TypeDescription schema = sales.getSchema();
+    try (ParquetWriter writer = new ParquetWriter(path, schema, conf,
+        CompressionKind.valueOf(args[1]))) {
+      VectorizedRowBatch batch = writer.getBatch();
+      while (sales.nextBatch(batch)) {
+        writer.writeBatch();
       }
     }
-    writer.close(true);
   }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java 
b/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java
index 3eafc87..9703d46 100644
--- a/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java
+++ b/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java
@@ -19,59 +19,26 @@
 package org.apache.orc.bench;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.orc.VectorToWritable;
-import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
-import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.bench.avro.AvroWriter;
 import org.apache.orc.bench.csv.CsvReader;
+import org.apache.orc.bench.parquet.ParquetWriter;
 
-import java.util.Properties;
 public class TaxiToParquet {
 
-  public static String getCodec(String name) {
-    if ("none".equals(name)) {
-      return "UNCOMPRESSED";
-    } else if ("zlib".equals(name)) {
-      return "GZIP";
-    } else if ("snappy".equals(name)) {
-      return "SNAPPY";
-    } else {
-      throw new IllegalArgumentException("Unknown compression " + name);
-    }
-  }
-
   public static void main(String[] args) throws Exception {
-    TypeDescription schema = TaxiToOrc.loadSchema("nyc-taxi.schema");
-    VectorizedRowBatch batch = schema.createRowBatch();
+    TypeDescription schema = Utilities.loadSchema("nyc-taxi.schema");
     JobConf conf = new JobConf();
-    conf.set("mapred.task.id", "attempt_0_0_m_0_0");
-    conf.set("parquet.compression", getCodec(args[1]));
-    boolean isCompressed = true;
     Path path = new Path(args[0]);
-    Properties properties = AvroWriter.setHiveSchema(schema);
-    MapredParquetOutputFormat format = new MapredParquetOutputFormat();
-    FileSinkOperator.RecordWriter writer = format.getHiveRecordWriter(conf,
-        path, ParquetHiveRecord.class, !"none".equals(args[1]), properties,
-        Reporter.NULL);
-    ParquetHiveRecord record = new ParquetHiveRecord();
-    record.inspector =
-        (StructObjectInspector) VectorToWritable.createObjectInspector(schema);
-    for(String inFile: TaxiToOrc.sliceArray(args, 2)) {
-      CsvReader reader = new CsvReader(new Path(inFile), conf, schema);
-      while (reader.nextBatch(batch)) {
-        for(int r=0; r < batch.size; ++r) {
-          record.value = VectorToWritable.createValue(batch, r, schema,
-              record.value);
-          writer.write(record);
+    try (ParquetWriter writer = new ParquetWriter(path, schema, conf, 
CompressionKind.valueOf(args[1]))) {
+      VectorizedRowBatch batch = writer.getBatch();
+      for(String inFile: Utilities.sliceArray(args, 2)) {
+        CsvReader reader = new CsvReader(new Path(inFile), conf, schema);
+        while (reader.nextBatch(batch)) {
+          writer.writeBatch();
         }
       }
     }
-    writer.close(true);
   }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/csv/CsvScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/csv/CsvScan.java 
b/java/bench/src/java/org/apache/orc/bench/csv/CsvScan.java
index ae78cc4..d1b372a 100644
--- a/java/bench/src/java/org/apache/orc/bench/csv/CsvScan.java
+++ b/java/bench/src/java/org/apache/orc/bench/csv/CsvScan.java
@@ -22,13 +22,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.bench.TaxiToOrc;
+import org.apache.orc.bench.Utilities;
 
 public class CsvScan {
   public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();
     long rowCount = 0;
-    TypeDescription schema = TaxiToOrc.loadSchema("nyc-taxi.schema");
+    TypeDescription schema = Utilities.loadSchema("nyc-taxi.schema");
     for(String filename: args) {
       CsvReader reader = new CsvReader(new Path(filename), conf, schema);
       VectorizedRowBatch batch = schema.createRowBatch();

http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableReadSupport.java
----------------------------------------------------------------------
diff --git 
a/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableReadSupport.java 
b/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableReadSupport.java
index 0bcce1f..37f2d75 100644
--- 
a/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableReadSupport.java
+++ 
b/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableReadSupport.java
@@ -14,10 +14,8 @@
 package org.apache.orc.bench.parquet;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/parquet/HiveCollectionConverter.java
----------------------------------------------------------------------
diff --git 
a/java/bench/src/java/org/apache/orc/bench/parquet/HiveCollectionConverter.java 
b/java/bench/src/java/org/apache/orc/bench/parquet/HiveCollectionConverter.java
index a8834aa..0623917 100644
--- 
a/java/bench/src/java/org/apache/orc/bench/parquet/HiveCollectionConverter.java
+++ 
b/java/bench/src/java/org/apache/orc/bench/parquet/HiveCollectionConverter.java
@@ -30,7 +30,6 @@ import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.Type;
 
 public class HiveCollectionConverter extends HiveGroupConverter {
-  private final GroupType collectionType;
   private final ConverterParent parent;
   private final int index;
   private final Converter innerConverter;
@@ -54,7 +53,6 @@ public class HiveCollectionConverter extends 
HiveGroupConverter {
                                   ConverterParent parent,
                                   int index, boolean isMap, TypeDescription 
hiveTypeInfo) {
     setMetadata(parent.getMetadata());
-    this.collectionType = collectionType;
     this.parent = parent;
     this.index = index;
     Type repeatedType = collectionType.getType(0);

http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/parquet/MapredParquetOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/java/bench/src/java/org/apache/orc/bench/parquet/MapredParquetOutputFormat.java
 
b/java/bench/src/java/org/apache/orc/bench/parquet/MapredParquetOutputFormat.java
deleted file mode 100644
index bfb48a9..0000000
--- 
a/java/bench/src/java/org/apache/orc/bench/parquet/MapredParquetOutputFormat.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.parquet;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
-import org.apache.hadoop.hive.ql.io.IOConstants;
-import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter;
-import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
-import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper;
-import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.util.Progressable;
-
-import org.apache.parquet.hadoop.ParquetOutputFormat;
-
-/**
- *
- * A Parquet OutputFormat for Hive (with the deprecated package mapred)
- *
- */
-public class MapredParquetOutputFormat extends FileOutputFormat<NullWritable, 
ParquetHiveRecord>
-    implements HiveOutputFormat<NullWritable, ParquetHiveRecord> {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(MapredParquetOutputFormat.class);
-
-  protected ParquetOutputFormat<ParquetHiveRecord> realOutputFormat;
-
-  public MapredParquetOutputFormat() {
-    realOutputFormat = new ParquetOutputFormat<ParquetHiveRecord>(new 
DataWritableWriteSupport());
-  }
-
-  public MapredParquetOutputFormat(final OutputFormat<Void, ParquetHiveRecord> 
mapreduceOutputFormat) {
-    realOutputFormat = (ParquetOutputFormat<ParquetHiveRecord>) 
mapreduceOutputFormat;
-  }
-
-  @Override
-  public void checkOutputSpecs(final FileSystem ignored, final JobConf job) 
throws IOException {
-    
realOutputFormat.checkOutputSpecs(ShimLoader.getHadoopShims().getHCatShim().createJobContext(job,
 null));
-  }
-
-  @Override
-  public RecordWriter<NullWritable, ParquetHiveRecord> getRecordWriter(
-      final FileSystem ignored,
-      final JobConf job,
-      final String name,
-      final Progressable progress
-      ) throws IOException {
-    throw new RuntimeException("Should never be used");
-  }
-
-  /**
-   *
-   * Create the parquet schema from the hive schema, and return the 
RecordWriterWrapper which
-   * contains the real output format
-   */
-  @Override
-  public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter 
getHiveRecordWriter(
-      final JobConf jobConf,
-      final Path finalOutPath,
-      final Class<? extends Writable> valueClass,
-      final boolean isCompressed,
-      final Properties tableProperties,
-      final Progressable progress) throws IOException {
-
-    LOG.info("creating new record writer..." + this);
-
-    final String columnNameProperty = 
tableProperties.getProperty(IOConstants.COLUMNS);
-    final String columnTypeProperty = 
tableProperties.getProperty(IOConstants.COLUMNS_TYPES);
-    List<String> columnNames;
-    List<TypeInfo> columnTypes;
-
-    if (columnNameProperty.length() == 0) {
-      columnNames = new ArrayList<String>();
-    } else {
-      columnNames = Arrays.asList(columnNameProperty.split(","));
-    }
-
-    if (columnTypeProperty.length() == 0) {
-      columnTypes = new ArrayList<TypeInfo>();
-    } else {
-      columnTypes = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
-    }
-
-    
DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, 
columnTypes), jobConf);
-
-    return getParquerRecordWriterWrapper(realOutputFormat, jobConf, 
finalOutPath.toString(),
-            progress,tableProperties);
-  }
-
-  protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper(
-      ParquetOutputFormat<ParquetHiveRecord> realOutputFormat,
-      JobConf jobConf,
-      String finalOutPath,
-      Progressable progress,
-      Properties tableProperties
-      ) throws IOException {
-    return new ParquetRecordWriterWrapper(realOutputFormat, jobConf, 
finalOutPath.toString(),
-            progress,tableProperties);
-  }
-}

http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/parquet/ParquetRecordReaderWrapper.java
----------------------------------------------------------------------
diff --git 
a/java/bench/src/java/org/apache/orc/bench/parquet/ParquetRecordReaderWrapper.java
 
b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetRecordReaderWrapper.java
index fdbc689..5eb3095 100644
--- 
a/java/bench/src/java/org/apache/orc/bench/parquet/ParquetRecordReaderWrapper.java
+++ 
b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetRecordReaderWrapper.java
@@ -47,8 +47,6 @@ import org.apache.parquet.hadoop.util.ContextUtil;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.MessageTypeParser;
 
-import com.google.common.base.Strings;
-
 public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, 
ArrayWritable> {
   public static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReaderWrapper.class);
 

http://git-wip-us.apache.org/repos/asf/orc/blob/611388b5/java/bench/src/java/org/apache/orc/bench/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git 
a/java/bench/src/java/org/apache/orc/bench/parquet/ParquetWriter.java 
b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetWriter.java
new file mode 100644
index 0000000..4ba67c9
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetWriter.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.CompressionKind;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import java.io.IOException;
+
+public class ParquetWriter implements AutoCloseable {
+  private final RecordWriter<Void, RowInBatch> writer;
+  private final TypeDescription schema;
+  private final RowInBatch batch;
+
+  public ParquetWriter(Path path,
+                       TypeDescription schema,
+                       Configuration conf,
+                       CompressionKind compression
+                       ) throws IOException, InterruptedException {
+    this.schema = schema;
+    this.batch = new RowInBatch(schema);
+    writer = new ParquetOutputFormat<RowInBatch>(new 
DataWritableWriteSupport())
+        .getRecordWriter(conf, path, getCodec(compression));
+  }
+
+  public VectorizedRowBatch getBatch() {
+    return batch.batch;
+  }
+
+  public void writeBatch() throws IOException, InterruptedException {
+    writer.write(null, batch);
+  }
+
+  public void close() throws IOException, InterruptedException {
+    writer.close(null);
+  }
+
+  public static CompressionCodecName getCodec(CompressionKind kind) {
+    switch (kind) {
+      case NONE:
+        return CompressionCodecName.UNCOMPRESSED;
+      case ZLIB:
+        return CompressionCodecName.GZIP;
+      case SNAPPY:
+        return CompressionCodecName.SNAPPY;
+      default:
+        throw new IllegalArgumentException("Unsupported codec " + kind);
+    }
+  }
+}

Reply via email to