http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/LeafFilterFactory.java
----------------------------------------------------------------------
diff --git 
a/java/bench/src/java/org/apache/orc/bench/parquet/LeafFilterFactory.java 
b/java/bench/src/java/org/apache/orc/bench/parquet/LeafFilterFactory.java
new file mode 100644
index 0000000..13a822a
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/parquet/LeafFilterFactory.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.parquet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.Operator;
+
+import org.apache.parquet.filter2.predicate.FilterApi;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.lt;
+import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.booleanColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.floatColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+
+public class LeafFilterFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LeafFilterFactory.class);
+
+  class IntFilterPredicateLeafBuilder extends FilterPredicateLeafBuilder {
+    /**
+     * @param op         consists of EQUALS, NULL_SAFE_EQUALS, LESS_THAN, 
LESS_THAN_EQUALS, IS_NULL
+     * @param literal
+     * @param columnName
+     * @return
+     */
+    @Override
+    public FilterPredicate buildPredict(Operator op, Object literal,
+                                        String columnName) {
+      switch (op) {
+        case LESS_THAN:
+          return lt(intColumn(columnName), ((Number) literal).intValue());
+        case IS_NULL:
+        case EQUALS:
+        case NULL_SAFE_EQUALS:
+          return eq(intColumn(columnName),
+            (literal == null) ? null : ((Number) literal).intValue());
+        case LESS_THAN_EQUALS:
+          return ltEq(intColumn(columnName), ((Number) literal).intValue());
+        default:
+          throw new RuntimeException("Unknown PredicateLeaf Operator type: " + 
op);
+      }
+    }
+  }
+
+  class LongFilterPredicateLeafBuilder extends FilterPredicateLeafBuilder {
+    @Override
+    public FilterPredicate buildPredict(Operator op, Object constant,
+                                        String columnName) {
+      switch (op) {
+        case LESS_THAN:
+          return lt(FilterApi.longColumn(columnName), ((Number) 
constant).longValue());
+        case IS_NULL:
+        case EQUALS:
+        case NULL_SAFE_EQUALS:
+          return eq(FilterApi.longColumn(columnName),
+            (constant == null) ? null : ((Number) constant).longValue());
+        case LESS_THAN_EQUALS:
+          return ltEq(FilterApi.longColumn(columnName),
+            ((Number) constant).longValue());
+        default:
+          throw new RuntimeException("Unknown PredicateLeaf Operator type: " + 
op);
+      }
+    }
+  }
+
+  class FloatFilterPredicateLeafBuilder extends FilterPredicateLeafBuilder {
+    @Override
+    public FilterPredicate buildPredict(Operator op, Object constant, String 
columnName) {
+      switch (op) {
+      case LESS_THAN:
+        return lt(floatColumn(columnName), ((Number) constant).floatValue());
+      case IS_NULL:
+      case EQUALS:
+      case NULL_SAFE_EQUALS:
+        return eq(floatColumn(columnName),
+            (constant == null) ? null : ((Number) constant).floatValue());
+      case LESS_THAN_EQUALS:
+        return ltEq(FilterApi.floatColumn(columnName), ((Number) 
constant).floatValue());
+      default:
+        throw new RuntimeException("Unknown PredicateLeaf Operator type: " + 
op);
+      }
+    }
+  }
+
+  class DoubleFilterPredicateLeafBuilder extends FilterPredicateLeafBuilder {
+
+    @Override
+    public FilterPredicate buildPredict(Operator op, Object constant,
+                                        String columnName) {
+      switch (op) {
+        case LESS_THAN:
+          return lt(doubleColumn(columnName), ((Number) 
constant).doubleValue());
+        case IS_NULL:
+        case EQUALS:
+        case NULL_SAFE_EQUALS:
+          return eq(doubleColumn(columnName),
+            (constant == null) ? null : ((Number) constant).doubleValue());
+        case LESS_THAN_EQUALS:
+          return ltEq(FilterApi.doubleColumn(columnName),
+            ((Number) constant).doubleValue());
+        default:
+          throw new RuntimeException("Unknown PredicateLeaf Operator type: " + 
op);
+      }
+    }
+  }
+
+  class BooleanFilterPredicateLeafBuilder extends FilterPredicateLeafBuilder {
+    @Override
+    public FilterPredicate buildPredict(Operator op, Object constant,
+                                        String columnName) throws Exception{
+      switch (op) {
+        case IS_NULL:
+        case EQUALS:
+        case NULL_SAFE_EQUALS:
+          return eq(booleanColumn(columnName),
+            (constant == null) ? null : ((Boolean) constant).booleanValue());
+        default:
+          throw new RuntimeException("Unknown PredicateLeaf Operator type: " + 
op);
+      }
+    }
+  }
+
+  class BinaryFilterPredicateLeafBuilder extends FilterPredicateLeafBuilder {
+    @Override
+    public FilterPredicate buildPredict(Operator op, Object constant,
+                                        String columnName) throws Exception{
+      switch (op) {
+        case LESS_THAN:
+          return lt(binaryColumn(columnName), Binary.fromString((String) 
constant));
+        case IS_NULL:
+        case EQUALS:
+        case NULL_SAFE_EQUALS:
+          return eq(binaryColumn(columnName),
+            (constant == null) ? null : Binary.fromString((String) constant));
+        case LESS_THAN_EQUALS:
+          return ltEq(binaryColumn(columnName), Binary.fromString((String) 
constant));
+        default:
+          // should never be executed
+          throw new RuntimeException("Unknown PredicateLeaf Operator type: " + 
op);
+      }
+    }
+  }
+
+  /**
+   * get leaf filter builder by FilterPredicateType, currently date, decimal 
and timestamp is not
+   * supported yet.
+   * @param type FilterPredicateType
+   * @return
+   */
+  public FilterPredicateLeafBuilder 
getLeafFilterBuilderByType(PredicateLeaf.Type type,
+                                                               Type 
parquetType){
+    switch (type){
+      case LONG:
+        if (parquetType.asPrimitiveType().getPrimitiveTypeName() ==
+            PrimitiveType.PrimitiveTypeName.INT32) {
+          return new IntFilterPredicateLeafBuilder();
+        } else {
+          return new LongFilterPredicateLeafBuilder();
+        }
+      case FLOAT:
+        if (parquetType.asPrimitiveType().getPrimitiveTypeName() ==
+            PrimitiveType.PrimitiveTypeName.FLOAT) {
+          return new FloatFilterPredicateLeafBuilder();
+        } else {
+          return new DoubleFilterPredicateLeafBuilder();
+        }
+      case STRING:  // string, char, varchar
+        return new BinaryFilterPredicateLeafBuilder();
+      case BOOLEAN:
+        return new BooleanFilterPredicateLeafBuilder();
+      case DATE:
+      case DECIMAL:
+      case TIMESTAMP:
+      default:
+        LOG.debug("Conversion to Parquet FilterPredicate not supported for " + 
type);
+        return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/MapredParquetOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/java/bench/src/java/org/apache/orc/bench/parquet/MapredParquetOutputFormat.java
 
b/java/bench/src/java/org/apache/orc/bench/parquet/MapredParquetOutputFormat.java
new file mode 100644
index 0000000..bfb48a9
--- /dev/null
+++ 
b/java/bench/src/java/org/apache/orc/bench/parquet/MapredParquetOutputFormat.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter;
+import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
+import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.util.Progressable;
+
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+
+/**
+ *
+ * A Parquet OutputFormat for Hive (with the deprecated package mapred)
+ *
+ */
+public class MapredParquetOutputFormat extends FileOutputFormat<NullWritable, 
ParquetHiveRecord>
+    implements HiveOutputFormat<NullWritable, ParquetHiveRecord> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MapredParquetOutputFormat.class);
+
+  protected ParquetOutputFormat<ParquetHiveRecord> realOutputFormat;
+
+  public MapredParquetOutputFormat() {
+    realOutputFormat = new ParquetOutputFormat<ParquetHiveRecord>(new 
DataWritableWriteSupport());
+  }
+
+  public MapredParquetOutputFormat(final OutputFormat<Void, ParquetHiveRecord> 
mapreduceOutputFormat) {
+    realOutputFormat = (ParquetOutputFormat<ParquetHiveRecord>) 
mapreduceOutputFormat;
+  }
+
+  @Override
+  public void checkOutputSpecs(final FileSystem ignored, final JobConf job) 
throws IOException {
+    
realOutputFormat.checkOutputSpecs(ShimLoader.getHadoopShims().getHCatShim().createJobContext(job,
 null));
+  }
+
+  @Override
+  public RecordWriter<NullWritable, ParquetHiveRecord> getRecordWriter(
+      final FileSystem ignored,
+      final JobConf job,
+      final String name,
+      final Progressable progress
+      ) throws IOException {
+    throw new RuntimeException("Should never be used");
+  }
+
+  /**
+   *
+   * Create the parquet schema from the hive schema, and return the 
RecordWriterWrapper which
+   * contains the real output format
+   */
+  @Override
+  public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter 
getHiveRecordWriter(
+      final JobConf jobConf,
+      final Path finalOutPath,
+      final Class<? extends Writable> valueClass,
+      final boolean isCompressed,
+      final Properties tableProperties,
+      final Progressable progress) throws IOException {
+
+    LOG.info("creating new record writer..." + this);
+
+    final String columnNameProperty = 
tableProperties.getProperty(IOConstants.COLUMNS);
+    final String columnTypeProperty = 
tableProperties.getProperty(IOConstants.COLUMNS_TYPES);
+    List<String> columnNames;
+    List<TypeInfo> columnTypes;
+
+    if (columnNameProperty.length() == 0) {
+      columnNames = new ArrayList<String>();
+    } else {
+      columnNames = Arrays.asList(columnNameProperty.split(","));
+    }
+
+    if (columnTypeProperty.length() == 0) {
+      columnTypes = new ArrayList<TypeInfo>();
+    } else {
+      columnTypes = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+    }
+
+    
DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, 
columnTypes), jobConf);
+
+    return getParquerRecordWriterWrapper(realOutputFormat, jobConf, 
finalOutPath.toString(),
+            progress,tableProperties);
+  }
+
+  protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper(
+      ParquetOutputFormat<ParquetHiveRecord> realOutputFormat,
+      JobConf jobConf,
+      String finalOutPath,
+      Progressable progress,
+      Properties tableProperties
+      ) throws IOException {
+    return new ParquetRecordWriterWrapper(realOutputFormat, jobConf, 
finalOutPath.toString(),
+            progress,tableProperties);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/NanoTime.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/NanoTime.java 
b/java/bench/src/java/org/apache/orc/bench/parquet/NanoTime.java
new file mode 100644
index 0000000..e2ee7a6
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/parquet/NanoTime.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.parquet;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+/**
+ * Provides a wrapper representing a parquet-timestamp, with methods to
+ * convert to and from binary.
+ */
+public class NanoTime {
+  private final int julianDay;
+  private final long timeOfDayNanos;
+  public static NanoTime fromBinary(Binary bytes) {
+    Preconditions.checkArgument(bytes.length() == 12, "Must be 12 bytes");
+    ByteBuffer buf = bytes.toByteBuffer();
+    buf.order(ByteOrder.LITTLE_ENDIAN);
+    long timeOfDayNanos = buf.getLong();
+    int julianDay = buf.getInt();
+    return new NanoTime(julianDay, timeOfDayNanos);
+  }
+
+  public NanoTime(int julianDay, long timeOfDayNanos) {
+    this.julianDay = julianDay;
+    this.timeOfDayNanos = timeOfDayNanos;
+  }
+
+  public int getJulianDay() {
+    return julianDay;
+  }
+
+  public long getTimeOfDayNanos() {
+    return timeOfDayNanos;
+  }
+
+  public Binary toBinary() {
+    ByteBuffer buf = ByteBuffer.allocate(12);
+    buf.order(ByteOrder.LITTLE_ENDIAN);
+    buf.putLong(timeOfDayNanos);
+    buf.putInt(julianDay);
+    buf.flip();
+    return Binary.fromByteBuffer(buf);
+  }
+
+  public void writeValue(RecordConsumer recordConsumer) {
+    recordConsumer.addBinary(toBinary());
+  }
+
+  @Override
+  public String toString() {
+    return "NanoTime{julianDay="+julianDay+", 
timeOfDayNanos="+timeOfDayNanos+"}";
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/NanoTimeUtils.java
----------------------------------------------------------------------
diff --git 
a/java/bench/src/java/org/apache/orc/bench/parquet/NanoTimeUtils.java 
b/java/bench/src/java/org/apache/orc/bench/parquet/NanoTimeUtils.java
new file mode 100644
index 0000000..ae3408c
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/parquet/NanoTimeUtils.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.parquet;
+
+import java.sql.Timestamp;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+
+import jodd.datetime.JDateTime;
+
+/**
+ * Utilities for converting from java.sql.Timestamp to parquet timestamp.
+ * This utilizes the Jodd library.
+ */
+public class NanoTimeUtils {
+   static final long NANOS_PER_HOUR = TimeUnit.HOURS.toNanos(1);
+   static final long NANOS_PER_MINUTE = TimeUnit.MINUTES.toNanos(1);
+   static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);
+   static final long NANOS_PER_DAY = TimeUnit.DAYS.toNanos(1);
+
+   private static final ThreadLocal<Calendar> parquetGMTCalendar = new 
ThreadLocal<Calendar>();
+   private static final ThreadLocal<Calendar> parquetLocalCalendar = new 
ThreadLocal<Calendar>();
+
+   private static Calendar getGMTCalendar() {
+     //Calendar.getInstance calculates the current-time needlessly, so cache 
an instance.
+     if (parquetGMTCalendar.get() == null) {
+       
parquetGMTCalendar.set(Calendar.getInstance(TimeZone.getTimeZone("GMT")));
+     }
+     return parquetGMTCalendar.get();
+   }
+
+   private static Calendar getLocalCalendar() {
+     if (parquetLocalCalendar.get() == null) {
+       parquetLocalCalendar.set(Calendar.getInstance());
+     }
+     return parquetLocalCalendar.get();
+   }
+
+   private static Calendar getCalendar(boolean skipConversion) {
+     Calendar calendar = skipConversion ? getLocalCalendar() : 
getGMTCalendar();
+     calendar.clear(); // Reset all fields before reusing this instance
+     return calendar;
+   }
+
+   public static NanoTime getNanoTime(Timestamp ts, boolean skipConversion) {
+
+     Calendar calendar = getCalendar(skipConversion);
+     calendar.setTime(ts);
+     int year = calendar.get(Calendar.YEAR);
+     if (calendar.get(Calendar.ERA) == GregorianCalendar.BC) {
+       year = 1 - year;
+     }
+     JDateTime jDateTime = new JDateTime(year,
+       calendar.get(Calendar.MONTH) + 1,  //java calendar index starting at 1.
+       calendar.get(Calendar.DAY_OF_MONTH));
+     int days = jDateTime.getJulianDayNumber();
+
+     long hour = calendar.get(Calendar.HOUR_OF_DAY);
+     long minute = calendar.get(Calendar.MINUTE);
+     long second = calendar.get(Calendar.SECOND);
+     long nanos = ts.getNanos();
+     long nanosOfDay = nanos + NANOS_PER_SECOND * second + NANOS_PER_MINUTE * 
minute +
+         NANOS_PER_HOUR * hour;
+
+     return new NanoTime(days, nanosOfDay);
+   }
+
+   public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion) {
+     int julianDay = nt.getJulianDay();
+     long nanosOfDay = nt.getTimeOfDayNanos();
+
+     long remainder = nanosOfDay;
+     julianDay += remainder / NANOS_PER_DAY;
+     remainder %= NANOS_PER_DAY;
+     if (remainder < 0) {
+       remainder += NANOS_PER_DAY;
+       julianDay--;
+     }
+
+     JDateTime jDateTime = new JDateTime((double) julianDay);
+     Calendar calendar = getCalendar(skipConversion);
+     calendar.set(Calendar.YEAR, jDateTime.getYear());
+     calendar.set(Calendar.MONTH, jDateTime.getMonth() - 1); //java calendar 
index starting at 1.
+     calendar.set(Calendar.DAY_OF_MONTH, jDateTime.getDay());
+
+     int hour = (int) (remainder / (NANOS_PER_HOUR));
+     remainder = remainder % (NANOS_PER_HOUR);
+     int minutes = (int) (remainder / (NANOS_PER_MINUTE));
+     remainder = remainder % (NANOS_PER_MINUTE);
+     int seconds = (int) (remainder / (NANOS_PER_SECOND));
+     long nanos = remainder % NANOS_PER_SECOND;
+
+     calendar.set(Calendar.HOUR_OF_DAY, hour);
+     calendar.set(Calendar.MINUTE, minutes);
+     calendar.set(Calendar.SECOND, seconds);
+     Timestamp ts = new Timestamp(calendar.getTimeInMillis());
+     ts.setNanos((int) nanos);
+     return ts;
+   }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/ParquetFilterPredicateConverter.java
----------------------------------------------------------------------
diff --git 
a/java/bench/src/java/org/apache/orc/bench/parquet/ParquetFilterPredicateConverter.java
 
b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetFilterPredicateConverter.java
new file mode 100644
index 0000000..a1d3006
--- /dev/null
+++ 
b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetFilterPredicateConverter.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.parquet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.parquet.filter2.predicate.FilterApi;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ParquetFilterPredicateConverter {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ParquetFilterPredicateConverter.class);
+
+  /**
+   * Translate the search argument to the filter predicate parquet uses. It 
includes
+   * only the columns from the passed schema.
+   * @return translate the sarg into a filter predicate
+   */
+  public static FilterPredicate toFilterPredicate(SearchArgument sarg, 
MessageType schema) {
+    Set<String> columns = null;
+    if (schema != null) {
+      columns = new HashSet<String>();
+      for (Type field : schema.getFields()) {
+        columns.add(field.getName());
+      }
+    }
+
+    return translate(sarg.getExpression(), sarg.getLeaves(), columns, schema);
+  }
+
+  private static FilterPredicate translate(ExpressionTree root,
+                                           List<PredicateLeaf> leaves,
+                                           Set<String> columns,
+                                           MessageType schema) {
+    FilterPredicate p = null;
+    switch (root.getOperator()) {
+      case OR:
+        for(ExpressionTree child: root.getChildren()) {
+          if (p == null) {
+            p = translate(child, leaves, columns, schema);
+          } else {
+            FilterPredicate right = translate(child, leaves, columns, schema);
+            // constant means no filter, ignore it when it is null
+            if(right != null){
+              p = FilterApi.or(p, right);
+            }
+          }
+        }
+        return p;
+      case AND:
+        for(ExpressionTree child: root.getChildren()) {
+          if (p == null) {
+            p = translate(child, leaves, columns, schema);
+          } else {
+            FilterPredicate right = translate(child, leaves, columns, schema);
+            // constant means no filter, ignore it when it is null
+            if(right != null){
+              p = FilterApi.and(p, right);
+            }
+          }
+        }
+        return p;
+      case NOT:
+        FilterPredicate op = translate(root.getChildren().get(0), leaves,
+            columns, schema);
+        if (op != null) {
+          return FilterApi.not(op);
+        } else {
+          return null;
+        }
+      case LEAF:
+        PredicateLeaf leaf = leaves.get(root.getLeaf());
+
+        // If columns is null, then we need to create the leaf
+        if (columns.contains(leaf.getColumnName())) {
+          Type parquetType = schema.getType(leaf.getColumnName());
+          return buildFilterPredicateFromPredicateLeaf(leaf, parquetType);
+        } else {
+          // Do not create predicate if the leaf is not on the passed schema.
+          return null;
+        }
+      case CONSTANT:
+        return null;// no filter will be executed for constant
+      default:
+        throw new IllegalStateException("Unknown operator: " +
+            root.getOperator());
+    }
+  }
+
+  private static FilterPredicate buildFilterPredicateFromPredicateLeaf
+      (PredicateLeaf leaf, Type parquetType) {
+    LeafFilterFactory leafFilterFactory = new LeafFilterFactory();
+    FilterPredicateLeafBuilder builder;
+    try {
+      builder = leafFilterFactory
+          .getLeafFilterBuilderByType(leaf.getType(), parquetType);
+      if (builder == null) {
+        return null;
+      }
+      if (isMultiLiteralsOperator(leaf.getOperator())) {
+        return builder.buildPredicate(leaf.getOperator(),
+            leaf.getLiteralList(),
+            leaf.getColumnName());
+      } else {
+        return builder
+            .buildPredict(leaf.getOperator(),
+                leaf.getLiteral(),
+                leaf.getColumnName());
+      }
+    } catch (Exception e) {
+      LOG.error("fail to build predicate filter leaf with errors" + e, e);
+      return null;
+    }
+  }
+
+  private static boolean isMultiLiteralsOperator(PredicateLeaf.Operator op) {
+    return (op == PredicateLeaf.Operator.IN) ||
+        (op == PredicateLeaf.Operator.BETWEEN);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/ParquetRecordReaderWrapper.java
----------------------------------------------------------------------
diff --git 
a/java/bench/src/java/org/apache/orc/bench/parquet/ParquetRecordReaderWrapper.java
 
b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetRecordReaderWrapper.java
new file mode 100644
index 0000000..fdbc689
--- /dev/null
+++ 
b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetRecordReaderWrapper.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.parquet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.RowGroupFilter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetInputSplit;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport.ReadContext;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+import com.google.common.base.Strings;
+
+public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, 
ArrayWritable> {
+  public static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReaderWrapper.class);
+
+  private final long splitLen; // for getPos()
+
+  private org.apache.hadoop.mapreduce.RecordReader<Void, ArrayWritable> 
realReader;
+  // expect readReader return same Key & Value objects (common case)
+  // this avoids extra serialization & deserialization of these objects
+  private ArrayWritable valueObj = null;
+  private boolean firstRecord = false;
+  private boolean eof = false;
+  private int schemaSize;
+  private boolean skipTimestampConversion = false;
+  private JobConf jobConf;
+  private List<BlockMetaData> filtedBlocks;
+
+  public ParquetRecordReaderWrapper(
+      final ParquetInputFormat<ArrayWritable> newInputFormat,
+      final InputSplit oldSplit,
+      final JobConf oldJobConf)
+          throws IOException, InterruptedException {
+    this.splitLen = oldSplit.getLength();
+
+    jobConf = oldJobConf;
+    final ParquetInputSplit split = getSplit(oldSplit, jobConf);
+
+    TaskAttemptID taskAttemptID = 
TaskAttemptID.forName(jobConf.get("mapred.task.id"));
+    if (taskAttemptID == null) {
+      taskAttemptID = new TaskAttemptID();
+    }
+
+    // create a TaskInputOutputContext
+    Configuration conf = jobConf;
+
+    final TaskAttemptContext taskContext = 
ContextUtil.newTaskAttemptContext(conf, taskAttemptID);
+    if (split != null) {
+      try {
+        realReader = newInputFormat.createRecordReader(split, taskContext);
+        realReader.initialize(split, taskContext);
+
+        // read once to gain access to key and value objects
+        if (realReader.nextKeyValue()) {
+          firstRecord = true;
+          valueObj = realReader.getCurrentValue();
+        } else {
+          eof = true;
+        }
+      } catch (final InterruptedException e) {
+        throw new IOException(e);
+      }
+    } else {
+      realReader = null;
+      eof = true;
+    }
+    if (valueObj == null) { // Should initialize the value for createValue
+      valueObj = new ArrayWritable(Writable.class, new Writable[schemaSize]);
+    }
+  }
+
+  public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) 
{
+    SearchArgument sarg = null; // ConvertAstToSearchArg.createFromConf(conf);
+    if (sarg == null) {
+      return null;
+    }
+
+    // Create the Parquet FilterPredicate without including columns that do 
not exist
+    // on the shema (such as partition columns).
+    FilterPredicate p = 
ParquetFilterPredicateConverter.toFilterPredicate(sarg, schema);
+    if (p != null) {
+      // Filter may have sensitive information. Do not send to debug.
+      LOG.debug("PARQUET predicate push down generated.");
+      ParquetInputFormat.setFilterPredicate(conf, p);
+      return FilterCompat.get(p);
+    } else {
+      // Filter may have sensitive information. Do not send to debug.
+      LOG.debug("No PARQUET predicate push down is generated.");
+      return null;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (realReader != null) {
+      realReader.close();
+    }
+  }
+
+  @Override
+  public NullWritable createKey() {
+    return null;
+  }
+
+  @Override
+  public ArrayWritable createValue() {
+    return valueObj;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return (long) (splitLen * getProgress());
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    if (realReader == null) {
+      return 1f;
+    } else {
+      try {
+        return realReader.getProgress();
+      } catch (final InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  @Override
+  public boolean next(final NullWritable key, final ArrayWritable value) 
throws IOException {
+    if (eof) {
+      return false;
+    }
+    try {
+      if (firstRecord) { // key & value are already read.
+        firstRecord = false;
+      } else if (!realReader.nextKeyValue()) {
+        eof = true; // strictly not required, just for consistency
+        return false;
+      }
+
+      final ArrayWritable tmpCurValue = realReader.getCurrentValue();
+      if (value != tmpCurValue) {
+        final Writable[] arrValue = value.get();
+        final Writable[] arrCurrent = tmpCurValue.get();
+        if (value != null && arrValue.length == arrCurrent.length) {
+          System.arraycopy(arrCurrent, 0, arrValue, 0, arrCurrent.length);
+        } else {
+          if (arrValue.length != arrCurrent.length) {
+            throw new IOException("DeprecatedParquetHiveInput : size of object 
differs. Value" +
+              " size :  " + arrValue.length + ", Current Object size : " + 
arrCurrent.length);
+          } else {
+            throw new IOException("DeprecatedParquetHiveInput can not support 
RecordReaders that" +
+              " don't return same key & value & value is null");
+          }
+        }
+      }
+      return true;
+    } catch (final InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * gets a ParquetInputSplit corresponding to a split given by Hive
+   *
+   * @param oldSplit The split given by Hive
+   * @param conf The JobConf of the Hive job
+   * @return a ParquetInputSplit corresponding to the oldSplit
+   * @throws IOException if the config cannot be enhanced or if the footer 
cannot be read from the file
+   */
+  @SuppressWarnings("deprecation")
+  protected ParquetInputSplit getSplit(
+      final InputSplit oldSplit,
+      final JobConf conf
+      ) throws IOException {
+    ParquetInputSplit split;
+    if (oldSplit instanceof FileSplit) {
+      final Path finalPath = ((FileSplit) oldSplit).getPath();
+
+      final ParquetMetadata parquetMetadata = 
ParquetFileReader.readFooter(jobConf, finalPath);
+      final List<BlockMetaData> blocks = parquetMetadata.getBlocks();
+      final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
+
+      final ReadContext readContext = new DataWritableReadSupport().init(new 
InitContext(jobConf,
+          null, fileMetaData.getSchema()));
+
+      schemaSize = 
MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata()
+          
.get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount();
+      final List<BlockMetaData> splitGroup = new ArrayList<BlockMetaData>();
+      final long splitStart = ((FileSplit) oldSplit).getStart();
+      final long splitLength = ((FileSplit) oldSplit).getLength();
+      for (final BlockMetaData block : blocks) {
+        final long firstDataPage = 
block.getColumns().get(0).getFirstDataPageOffset();
+        if (firstDataPage >= splitStart && firstDataPage < splitStart + 
splitLength) {
+          splitGroup.add(block);
+        }
+      }
+      if (splitGroup.isEmpty()) {
+        LOG.warn("Skipping split, could not find row group in: " + (FileSplit) 
oldSplit);
+        return null;
+      }
+
+      FilterCompat.Filter filter = setFilter(jobConf, 
fileMetaData.getSchema());
+      if (filter != null) {
+        filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, 
fileMetaData.getSchema());
+        if (filtedBlocks.isEmpty()) {
+          LOG.debug("All row groups are dropped due to filter predicates");
+          return null;
+        }
+
+        long droppedBlocks = splitGroup.size() - filtedBlocks.size();
+        if (droppedBlocks > 0) {
+          LOG.debug("Dropping " + droppedBlocks + " row groups that do not 
pass filter predicate");
+        }
+      } else {
+        filtedBlocks = splitGroup;
+      }
+
+      split = new ParquetInputSplit(finalPath,
+          splitStart,
+          splitLength,
+          ((FileSplit) oldSplit).getLocations(),
+          filtedBlocks,
+          readContext.getRequestedSchema().toString(),
+          fileMetaData.getSchema().toString(),
+          fileMetaData.getKeyValueMetaData(),
+          readContext.getReadSupportMetadata());
+      return split;
+    } else {
+      throw new IllegalArgumentException("Unknown split type: " + oldSplit);
+    }
+  }
+
+  public List<BlockMetaData> getFiltedBlocks() {
+    return filtedBlocks;
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/Repeated.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/Repeated.java 
b/java/bench/src/java/org/apache/orc/bench/parquet/Repeated.java
new file mode 100644
index 0000000..02f5351
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/parquet/Repeated.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.parquet;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.orc.TypeDescription;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.PrimitiveType;
+
+/**
+ * Converters for repeated fields need to know when the parent field starts and
+ * ends to correctly build lists from the repeated values.
+ */
+public interface Repeated extends ConverterParent {
+
+  public void parentStart();
+
+  public void parentEnd();
+
+  abstract class RepeatedConverterParent extends PrimitiveConverter implements 
Repeated {
+    private Map<String, String> metadata;
+
+    public void setMetadata(Map<String, String> metadata) {
+      this.metadata = metadata;
+    }
+
+    public Map<String, String> getMetadata() {
+      return metadata;
+    }
+  }
+
+  /**
+   * Stands in for a PrimitiveConverter and accumulates multiple values as an
+   * ArrayWritable.
+   */
+  class RepeatedPrimitiveConverter extends RepeatedConverterParent {
+    private final PrimitiveType primitiveType;
+    private final PrimitiveConverter wrapped;
+    private final ConverterParent parent;
+    private final int index;
+    private final List<Writable> list = new ArrayList<Writable>();
+
+    public RepeatedPrimitiveConverter(PrimitiveType primitiveType, 
ConverterParent parent, int index, TypeDescription hiveTypeInfo) {
+      setMetadata(parent.getMetadata());
+      this.primitiveType = primitiveType;
+      this.parent = parent;
+      this.index = index;
+      this.wrapped = 
HiveGroupConverter.getConverterFromDescription(primitiveType, 0, this, 
hiveTypeInfo);
+    }
+
+    @Override
+    public boolean hasDictionarySupport() {
+      return wrapped.hasDictionarySupport();
+    }
+
+    @Override
+    public void setDictionary(Dictionary dictionary) {
+      wrapped.setDictionary(dictionary);
+    }
+
+    @Override
+    public void addValueFromDictionary(int dictionaryId) {
+      wrapped.addValueFromDictionary(dictionaryId);
+    }
+
+    @Override
+    public void addBinary(Binary value) {
+      wrapped.addBinary(value);
+    }
+
+    @Override
+    public void addBoolean(boolean value) {
+      wrapped.addBoolean(value);
+    }
+
+    @Override
+    public void addDouble(double value) {
+      wrapped.addDouble(value);
+    }
+
+    @Override
+    public void addFloat(float value) {
+      wrapped.addFloat(value);
+    }
+
+    @Override
+    public void addInt(int value) {
+      wrapped.addInt(value);
+    }
+
+    @Override
+    public void addLong(long value) {
+      wrapped.addLong(value);
+    }
+
+    @Override
+    public void parentStart() {
+      list.clear();
+    }
+
+    @Override
+    public void parentEnd() {
+      parent.set(index, new ArrayWritable(
+          Writable.class, list.toArray(new Writable[list.size()])));
+    }
+
+    @Override
+    public void set(int index, Writable value) {
+      list.add(value);
+    }
+  }
+
+  /**
+   * Stands in for a HiveGroupConverter and accumulates multiple values as an
+   * ArrayWritable.
+   */
+  class RepeatedGroupConverter extends HiveGroupConverter
+      implements Repeated {
+    private final GroupType groupType;
+    private final HiveGroupConverter wrapped;
+    private final ConverterParent parent;
+    private final int index;
+    private final List<Writable> list = new ArrayList<Writable>();
+    private final Map<String, String> metadata = new HashMap<String, String>();
+
+
+    public RepeatedGroupConverter(GroupType groupType, ConverterParent parent, 
int index, TypeDescription hiveTypeInfo) {
+      setMetadata(parent.getMetadata());
+      this.groupType = groupType;
+      this.parent = parent;
+      this.index = index;
+      this.wrapped = HiveGroupConverter.getConverterFromDescription(groupType, 
0, this, hiveTypeInfo);
+    }
+
+    @Override
+    public void set(int fieldIndex, Writable value) {
+      list.add(value);
+    }
+
+    @Override
+    public Converter getConverter(int fieldIndex) {
+      // delegate to the group's converters
+      return wrapped.getConverter(fieldIndex);
+    }
+
+    @Override
+    public void start() {
+      wrapped.start();
+    }
+
+    @Override
+    public void end() {
+      wrapped.end();
+    }
+
+    @Override
+    public void parentStart() {
+      list.clear();
+    }
+
+    @Override
+    public void parentEnd() {
+      parent.set(index, new ArrayWritable(
+          Writable.class, list.toArray(new Writable[list.size()])));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/pom.xml
----------------------------------------------------------------------
diff --git a/java/pom.xml b/java/pom.xml
index c34a4f4..7335b54 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -381,6 +381,11 @@
         <version>1.1</version>
       </dependency>
       <dependency>
+        <groupId>org.jodd</groupId>
+        <artifactId>jodd-core</artifactId>
+        <version>3.5.2</version>
+      </dependency>
+      <dependency>
         <groupId>org.openjdk.jmh</groupId>
         <artifactId>jmh-core</artifactId>
         <version>1.12</version>

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/tools/src/java/org/apache/orc/tools/PrintData.java
----------------------------------------------------------------------
diff --git a/java/tools/src/java/org/apache/orc/tools/PrintData.java 
b/java/tools/src/java/org/apache/orc/tools/PrintData.java
index ebd9ae1..5ed03a5 100644
--- a/java/tools/src/java/org/apache/orc/tools/PrintData.java
+++ b/java/tools/src/java/org/apache/orc/tools/PrintData.java
@@ -175,10 +175,10 @@ public class PrintData {
     }
   }
 
-  static void printRow(JSONWriter writer,
-                       VectorizedRowBatch batch,
-                       TypeDescription schema,
-                       int row) throws JSONException {
+  public static void printRow(JSONWriter writer,
+                              VectorizedRowBatch batch,
+                              TypeDescription schema,
+                              int row) throws JSONException {
     if (schema.getCategory() == TypeDescription.Category.STRUCT) {
       List<TypeDescription> fieldTypes = schema.getChildren();
       List<String> fieldNames = schema.getFieldNames();

Reply via email to