http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/LeafFilterFactory.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/LeafFilterFactory.java b/java/bench/src/java/org/apache/orc/bench/parquet/LeafFilterFactory.java new file mode 100644 index 0000000..13a822a --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/parquet/LeafFilterFactory.java @@ -0,0 +1,200 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.bench.parquet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.Operator; + +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +import static org.apache.parquet.filter2.predicate.FilterApi.eq; +import static org.apache.parquet.filter2.predicate.FilterApi.lt; +import static org.apache.parquet.filter2.predicate.FilterApi.ltEq; +import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn; +import static org.apache.parquet.filter2.predicate.FilterApi.booleanColumn; +import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn; +import static org.apache.parquet.filter2.predicate.FilterApi.floatColumn; +import static org.apache.parquet.filter2.predicate.FilterApi.intColumn; + +public class LeafFilterFactory { + private static final Logger LOG = LoggerFactory.getLogger(LeafFilterFactory.class); + + class IntFilterPredicateLeafBuilder extends FilterPredicateLeafBuilder { + /** + * @param op consists of EQUALS, NULL_SAFE_EQUALS, LESS_THAN, LESS_THAN_EQUALS, IS_NULL + * @param literal + * @param columnName + * @return + */ + @Override + public FilterPredicate buildPredict(Operator op, Object literal, + String columnName) { + switch (op) { + case LESS_THAN: + return lt(intColumn(columnName), ((Number) literal).intValue()); + case IS_NULL: + case EQUALS: + case NULL_SAFE_EQUALS: + return eq(intColumn(columnName), + (literal == null) ? null : ((Number) literal).intValue()); + case LESS_THAN_EQUALS: + return ltEq(intColumn(columnName), ((Number) literal).intValue()); + default: + throw new RuntimeException("Unknown PredicateLeaf Operator type: " + op); + } + } + } + + class LongFilterPredicateLeafBuilder extends FilterPredicateLeafBuilder { + @Override + public FilterPredicate buildPredict(Operator op, Object constant, + String columnName) { + switch (op) { + case LESS_THAN: + return lt(FilterApi.longColumn(columnName), ((Number) constant).longValue()); + case IS_NULL: + case EQUALS: + case NULL_SAFE_EQUALS: + return eq(FilterApi.longColumn(columnName), + (constant == null) ? null : ((Number) constant).longValue()); + case LESS_THAN_EQUALS: + return ltEq(FilterApi.longColumn(columnName), + ((Number) constant).longValue()); + default: + throw new RuntimeException("Unknown PredicateLeaf Operator type: " + op); + } + } + } + + class FloatFilterPredicateLeafBuilder extends FilterPredicateLeafBuilder { + @Override + public FilterPredicate buildPredict(Operator op, Object constant, String columnName) { + switch (op) { + case LESS_THAN: + return lt(floatColumn(columnName), ((Number) constant).floatValue()); + case IS_NULL: + case EQUALS: + case NULL_SAFE_EQUALS: + return eq(floatColumn(columnName), + (constant == null) ? null : ((Number) constant).floatValue()); + case LESS_THAN_EQUALS: + return ltEq(FilterApi.floatColumn(columnName), ((Number) constant).floatValue()); + default: + throw new RuntimeException("Unknown PredicateLeaf Operator type: " + op); + } + } + } + + class DoubleFilterPredicateLeafBuilder extends FilterPredicateLeafBuilder { + + @Override + public FilterPredicate buildPredict(Operator op, Object constant, + String columnName) { + switch (op) { + case LESS_THAN: + return lt(doubleColumn(columnName), ((Number) constant).doubleValue()); + case IS_NULL: + case EQUALS: + case NULL_SAFE_EQUALS: + return eq(doubleColumn(columnName), + (constant == null) ? null : ((Number) constant).doubleValue()); + case LESS_THAN_EQUALS: + return ltEq(FilterApi.doubleColumn(columnName), + ((Number) constant).doubleValue()); + default: + throw new RuntimeException("Unknown PredicateLeaf Operator type: " + op); + } + } + } + + class BooleanFilterPredicateLeafBuilder extends FilterPredicateLeafBuilder { + @Override + public FilterPredicate buildPredict(Operator op, Object constant, + String columnName) throws Exception{ + switch (op) { + case IS_NULL: + case EQUALS: + case NULL_SAFE_EQUALS: + return eq(booleanColumn(columnName), + (constant == null) ? null : ((Boolean) constant).booleanValue()); + default: + throw new RuntimeException("Unknown PredicateLeaf Operator type: " + op); + } + } + } + + class BinaryFilterPredicateLeafBuilder extends FilterPredicateLeafBuilder { + @Override + public FilterPredicate buildPredict(Operator op, Object constant, + String columnName) throws Exception{ + switch (op) { + case LESS_THAN: + return lt(binaryColumn(columnName), Binary.fromString((String) constant)); + case IS_NULL: + case EQUALS: + case NULL_SAFE_EQUALS: + return eq(binaryColumn(columnName), + (constant == null) ? null : Binary.fromString((String) constant)); + case LESS_THAN_EQUALS: + return ltEq(binaryColumn(columnName), Binary.fromString((String) constant)); + default: + // should never be executed + throw new RuntimeException("Unknown PredicateLeaf Operator type: " + op); + } + } + } + + /** + * get leaf filter builder by FilterPredicateType, currently date, decimal and timestamp is not + * supported yet. + * @param type FilterPredicateType + * @return + */ + public FilterPredicateLeafBuilder getLeafFilterBuilderByType(PredicateLeaf.Type type, + Type parquetType){ + switch (type){ + case LONG: + if (parquetType.asPrimitiveType().getPrimitiveTypeName() == + PrimitiveType.PrimitiveTypeName.INT32) { + return new IntFilterPredicateLeafBuilder(); + } else { + return new LongFilterPredicateLeafBuilder(); + } + case FLOAT: + if (parquetType.asPrimitiveType().getPrimitiveTypeName() == + PrimitiveType.PrimitiveTypeName.FLOAT) { + return new FloatFilterPredicateLeafBuilder(); + } else { + return new DoubleFilterPredicateLeafBuilder(); + } + case STRING: // string, char, varchar + return new BinaryFilterPredicateLeafBuilder(); + case BOOLEAN: + return new BooleanFilterPredicateLeafBuilder(); + case DATE: + case DECIMAL: + case TIMESTAMP: + default: + LOG.debug("Conversion to Parquet FilterPredicate not supported for " + type); + return null; + } + } +}
http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/MapredParquetOutputFormat.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/MapredParquetOutputFormat.java b/java/bench/src/java/org/apache/orc/bench/parquet/MapredParquetOutputFormat.java new file mode 100644 index 0000000..bfb48a9 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/parquet/MapredParquetOutputFormat.java @@ -0,0 +1,129 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter; +import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport; +import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.util.Progressable; + +import org.apache.parquet.hadoop.ParquetOutputFormat; + +/** + * + * A Parquet OutputFormat for Hive (with the deprecated package mapred) + * + */ +public class MapredParquetOutputFormat extends FileOutputFormat<NullWritable, ParquetHiveRecord> + implements HiveOutputFormat<NullWritable, ParquetHiveRecord> { + + private static final Logger LOG = LoggerFactory.getLogger(MapredParquetOutputFormat.class); + + protected ParquetOutputFormat<ParquetHiveRecord> realOutputFormat; + + public MapredParquetOutputFormat() { + realOutputFormat = new ParquetOutputFormat<ParquetHiveRecord>(new DataWritableWriteSupport()); + } + + public MapredParquetOutputFormat(final OutputFormat<Void, ParquetHiveRecord> mapreduceOutputFormat) { + realOutputFormat = (ParquetOutputFormat<ParquetHiveRecord>) mapreduceOutputFormat; + } + + @Override + public void checkOutputSpecs(final FileSystem ignored, final JobConf job) throws IOException { + realOutputFormat.checkOutputSpecs(ShimLoader.getHadoopShims().getHCatShim().createJobContext(job, null)); + } + + @Override + public RecordWriter<NullWritable, ParquetHiveRecord> getRecordWriter( + final FileSystem ignored, + final JobConf job, + final String name, + final Progressable progress + ) throws IOException { + throw new RuntimeException("Should never be used"); + } + + /** + * + * Create the parquet schema from the hive schema, and return the RecordWriterWrapper which + * contains the real output format + */ + @Override + public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter( + final JobConf jobConf, + final Path finalOutPath, + final Class<? extends Writable> valueClass, + final boolean isCompressed, + final Properties tableProperties, + final Progressable progress) throws IOException { + + LOG.info("creating new record writer..." + this); + + final String columnNameProperty = tableProperties.getProperty(IOConstants.COLUMNS); + final String columnTypeProperty = tableProperties.getProperty(IOConstants.COLUMNS_TYPES); + List<String> columnNames; + List<TypeInfo> columnTypes; + + if (columnNameProperty.length() == 0) { + columnNames = new ArrayList<String>(); + } else { + columnNames = Arrays.asList(columnNameProperty.split(",")); + } + + if (columnTypeProperty.length() == 0) { + columnTypes = new ArrayList<TypeInfo>(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + } + + DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), jobConf); + + return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), + progress,tableProperties); + } + + protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper( + ParquetOutputFormat<ParquetHiveRecord> realOutputFormat, + JobConf jobConf, + String finalOutPath, + Progressable progress, + Properties tableProperties + ) throws IOException { + return new ParquetRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), + progress,tableProperties); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/NanoTime.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/NanoTime.java b/java/bench/src/java/org/apache/orc/bench/parquet/NanoTime.java new file mode 100644 index 0000000..e2ee7a6 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/parquet/NanoTime.java @@ -0,0 +1,68 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.bench.parquet; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import org.apache.parquet.Preconditions; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.RecordConsumer; +/** + * Provides a wrapper representing a parquet-timestamp, with methods to + * convert to and from binary. + */ +public class NanoTime { + private final int julianDay; + private final long timeOfDayNanos; + public static NanoTime fromBinary(Binary bytes) { + Preconditions.checkArgument(bytes.length() == 12, "Must be 12 bytes"); + ByteBuffer buf = bytes.toByteBuffer(); + buf.order(ByteOrder.LITTLE_ENDIAN); + long timeOfDayNanos = buf.getLong(); + int julianDay = buf.getInt(); + return new NanoTime(julianDay, timeOfDayNanos); + } + + public NanoTime(int julianDay, long timeOfDayNanos) { + this.julianDay = julianDay; + this.timeOfDayNanos = timeOfDayNanos; + } + + public int getJulianDay() { + return julianDay; + } + + public long getTimeOfDayNanos() { + return timeOfDayNanos; + } + + public Binary toBinary() { + ByteBuffer buf = ByteBuffer.allocate(12); + buf.order(ByteOrder.LITTLE_ENDIAN); + buf.putLong(timeOfDayNanos); + buf.putInt(julianDay); + buf.flip(); + return Binary.fromByteBuffer(buf); + } + + public void writeValue(RecordConsumer recordConsumer) { + recordConsumer.addBinary(toBinary()); + } + + @Override + public String toString() { + return "NanoTime{julianDay="+julianDay+", timeOfDayNanos="+timeOfDayNanos+"}"; + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/NanoTimeUtils.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/NanoTimeUtils.java b/java/bench/src/java/org/apache/orc/bench/parquet/NanoTimeUtils.java new file mode 100644 index 0000000..ae3408c --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/parquet/NanoTimeUtils.java @@ -0,0 +1,113 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.bench.parquet; + +import java.sql.Timestamp; +import java.util.Calendar; +import java.util.GregorianCalendar; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; + +import jodd.datetime.JDateTime; + +/** + * Utilities for converting from java.sql.Timestamp to parquet timestamp. + * This utilizes the Jodd library. + */ +public class NanoTimeUtils { + static final long NANOS_PER_HOUR = TimeUnit.HOURS.toNanos(1); + static final long NANOS_PER_MINUTE = TimeUnit.MINUTES.toNanos(1); + static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1); + static final long NANOS_PER_DAY = TimeUnit.DAYS.toNanos(1); + + private static final ThreadLocal<Calendar> parquetGMTCalendar = new ThreadLocal<Calendar>(); + private static final ThreadLocal<Calendar> parquetLocalCalendar = new ThreadLocal<Calendar>(); + + private static Calendar getGMTCalendar() { + //Calendar.getInstance calculates the current-time needlessly, so cache an instance. + if (parquetGMTCalendar.get() == null) { + parquetGMTCalendar.set(Calendar.getInstance(TimeZone.getTimeZone("GMT"))); + } + return parquetGMTCalendar.get(); + } + + private static Calendar getLocalCalendar() { + if (parquetLocalCalendar.get() == null) { + parquetLocalCalendar.set(Calendar.getInstance()); + } + return parquetLocalCalendar.get(); + } + + private static Calendar getCalendar(boolean skipConversion) { + Calendar calendar = skipConversion ? getLocalCalendar() : getGMTCalendar(); + calendar.clear(); // Reset all fields before reusing this instance + return calendar; + } + + public static NanoTime getNanoTime(Timestamp ts, boolean skipConversion) { + + Calendar calendar = getCalendar(skipConversion); + calendar.setTime(ts); + int year = calendar.get(Calendar.YEAR); + if (calendar.get(Calendar.ERA) == GregorianCalendar.BC) { + year = 1 - year; + } + JDateTime jDateTime = new JDateTime(year, + calendar.get(Calendar.MONTH) + 1, //java calendar index starting at 1. + calendar.get(Calendar.DAY_OF_MONTH)); + int days = jDateTime.getJulianDayNumber(); + + long hour = calendar.get(Calendar.HOUR_OF_DAY); + long minute = calendar.get(Calendar.MINUTE); + long second = calendar.get(Calendar.SECOND); + long nanos = ts.getNanos(); + long nanosOfDay = nanos + NANOS_PER_SECOND * second + NANOS_PER_MINUTE * minute + + NANOS_PER_HOUR * hour; + + return new NanoTime(days, nanosOfDay); + } + + public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion) { + int julianDay = nt.getJulianDay(); + long nanosOfDay = nt.getTimeOfDayNanos(); + + long remainder = nanosOfDay; + julianDay += remainder / NANOS_PER_DAY; + remainder %= NANOS_PER_DAY; + if (remainder < 0) { + remainder += NANOS_PER_DAY; + julianDay--; + } + + JDateTime jDateTime = new JDateTime((double) julianDay); + Calendar calendar = getCalendar(skipConversion); + calendar.set(Calendar.YEAR, jDateTime.getYear()); + calendar.set(Calendar.MONTH, jDateTime.getMonth() - 1); //java calendar index starting at 1. + calendar.set(Calendar.DAY_OF_MONTH, jDateTime.getDay()); + + int hour = (int) (remainder / (NANOS_PER_HOUR)); + remainder = remainder % (NANOS_PER_HOUR); + int minutes = (int) (remainder / (NANOS_PER_MINUTE)); + remainder = remainder % (NANOS_PER_MINUTE); + int seconds = (int) (remainder / (NANOS_PER_SECOND)); + long nanos = remainder % NANOS_PER_SECOND; + + calendar.set(Calendar.HOUR_OF_DAY, hour); + calendar.set(Calendar.MINUTE, minutes); + calendar.set(Calendar.SECOND, seconds); + Timestamp ts = new Timestamp(calendar.getTimeInMillis()); + ts.setNanos((int) nanos); + return ts; + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/ParquetFilterPredicateConverter.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/ParquetFilterPredicateConverter.java b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetFilterPredicateConverter.java new file mode 100644 index 0000000..a1d3006 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetFilterPredicateConverter.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.bench.parquet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class ParquetFilterPredicateConverter { + private static final Logger LOG = LoggerFactory.getLogger(ParquetFilterPredicateConverter.class); + + /** + * Translate the search argument to the filter predicate parquet uses. It includes + * only the columns from the passed schema. + * @return translate the sarg into a filter predicate + */ + public static FilterPredicate toFilterPredicate(SearchArgument sarg, MessageType schema) { + Set<String> columns = null; + if (schema != null) { + columns = new HashSet<String>(); + for (Type field : schema.getFields()) { + columns.add(field.getName()); + } + } + + return translate(sarg.getExpression(), sarg.getLeaves(), columns, schema); + } + + private static FilterPredicate translate(ExpressionTree root, + List<PredicateLeaf> leaves, + Set<String> columns, + MessageType schema) { + FilterPredicate p = null; + switch (root.getOperator()) { + case OR: + for(ExpressionTree child: root.getChildren()) { + if (p == null) { + p = translate(child, leaves, columns, schema); + } else { + FilterPredicate right = translate(child, leaves, columns, schema); + // constant means no filter, ignore it when it is null + if(right != null){ + p = FilterApi.or(p, right); + } + } + } + return p; + case AND: + for(ExpressionTree child: root.getChildren()) { + if (p == null) { + p = translate(child, leaves, columns, schema); + } else { + FilterPredicate right = translate(child, leaves, columns, schema); + // constant means no filter, ignore it when it is null + if(right != null){ + p = FilterApi.and(p, right); + } + } + } + return p; + case NOT: + FilterPredicate op = translate(root.getChildren().get(0), leaves, + columns, schema); + if (op != null) { + return FilterApi.not(op); + } else { + return null; + } + case LEAF: + PredicateLeaf leaf = leaves.get(root.getLeaf()); + + // If columns is null, then we need to create the leaf + if (columns.contains(leaf.getColumnName())) { + Type parquetType = schema.getType(leaf.getColumnName()); + return buildFilterPredicateFromPredicateLeaf(leaf, parquetType); + } else { + // Do not create predicate if the leaf is not on the passed schema. + return null; + } + case CONSTANT: + return null;// no filter will be executed for constant + default: + throw new IllegalStateException("Unknown operator: " + + root.getOperator()); + } + } + + private static FilterPredicate buildFilterPredicateFromPredicateLeaf + (PredicateLeaf leaf, Type parquetType) { + LeafFilterFactory leafFilterFactory = new LeafFilterFactory(); + FilterPredicateLeafBuilder builder; + try { + builder = leafFilterFactory + .getLeafFilterBuilderByType(leaf.getType(), parquetType); + if (builder == null) { + return null; + } + if (isMultiLiteralsOperator(leaf.getOperator())) { + return builder.buildPredicate(leaf.getOperator(), + leaf.getLiteralList(), + leaf.getColumnName()); + } else { + return builder + .buildPredict(leaf.getOperator(), + leaf.getLiteral(), + leaf.getColumnName()); + } + } catch (Exception e) { + LOG.error("fail to build predicate filter leaf with errors" + e, e); + return null; + } + } + + private static boolean isMultiLiteralsOperator(PredicateLeaf.Operator op) { + return (op == PredicateLeaf.Operator.IN) || + (op == PredicateLeaf.Operator.BETWEEN); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/ParquetRecordReaderWrapper.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/ParquetRecordReaderWrapper.java b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetRecordReaderWrapper.java new file mode 100644 index 0000000..fdbc689 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetRecordReaderWrapper.java @@ -0,0 +1,276 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.bench.parquet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; + +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.RowGroupFilter; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetInputSplit; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport.ReadContext; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.ContextUtil; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; + +import com.google.common.base.Strings; + +public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, ArrayWritable> { + public static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReaderWrapper.class); + + private final long splitLen; // for getPos() + + private org.apache.hadoop.mapreduce.RecordReader<Void, ArrayWritable> realReader; + // expect readReader return same Key & Value objects (common case) + // this avoids extra serialization & deserialization of these objects + private ArrayWritable valueObj = null; + private boolean firstRecord = false; + private boolean eof = false; + private int schemaSize; + private boolean skipTimestampConversion = false; + private JobConf jobConf; + private List<BlockMetaData> filtedBlocks; + + public ParquetRecordReaderWrapper( + final ParquetInputFormat<ArrayWritable> newInputFormat, + final InputSplit oldSplit, + final JobConf oldJobConf) + throws IOException, InterruptedException { + this.splitLen = oldSplit.getLength(); + + jobConf = oldJobConf; + final ParquetInputSplit split = getSplit(oldSplit, jobConf); + + TaskAttemptID taskAttemptID = TaskAttemptID.forName(jobConf.get("mapred.task.id")); + if (taskAttemptID == null) { + taskAttemptID = new TaskAttemptID(); + } + + // create a TaskInputOutputContext + Configuration conf = jobConf; + + final TaskAttemptContext taskContext = ContextUtil.newTaskAttemptContext(conf, taskAttemptID); + if (split != null) { + try { + realReader = newInputFormat.createRecordReader(split, taskContext); + realReader.initialize(split, taskContext); + + // read once to gain access to key and value objects + if (realReader.nextKeyValue()) { + firstRecord = true; + valueObj = realReader.getCurrentValue(); + } else { + eof = true; + } + } catch (final InterruptedException e) { + throw new IOException(e); + } + } else { + realReader = null; + eof = true; + } + if (valueObj == null) { // Should initialize the value for createValue + valueObj = new ArrayWritable(Writable.class, new Writable[schemaSize]); + } + } + + public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { + SearchArgument sarg = null; // ConvertAstToSearchArg.createFromConf(conf); + if (sarg == null) { + return null; + } + + // Create the Parquet FilterPredicate without including columns that do not exist + // on the shema (such as partition columns). + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, schema); + if (p != null) { + // Filter may have sensitive information. Do not send to debug. + LOG.debug("PARQUET predicate push down generated."); + ParquetInputFormat.setFilterPredicate(conf, p); + return FilterCompat.get(p); + } else { + // Filter may have sensitive information. Do not send to debug. + LOG.debug("No PARQUET predicate push down is generated."); + return null; + } + } + + @Override + public void close() throws IOException { + if (realReader != null) { + realReader.close(); + } + } + + @Override + public NullWritable createKey() { + return null; + } + + @Override + public ArrayWritable createValue() { + return valueObj; + } + + @Override + public long getPos() throws IOException { + return (long) (splitLen * getProgress()); + } + + @Override + public float getProgress() throws IOException { + if (realReader == null) { + return 1f; + } else { + try { + return realReader.getProgress(); + } catch (final InterruptedException e) { + throw new IOException(e); + } + } + } + + @Override + public boolean next(final NullWritable key, final ArrayWritable value) throws IOException { + if (eof) { + return false; + } + try { + if (firstRecord) { // key & value are already read. + firstRecord = false; + } else if (!realReader.nextKeyValue()) { + eof = true; // strictly not required, just for consistency + return false; + } + + final ArrayWritable tmpCurValue = realReader.getCurrentValue(); + if (value != tmpCurValue) { + final Writable[] arrValue = value.get(); + final Writable[] arrCurrent = tmpCurValue.get(); + if (value != null && arrValue.length == arrCurrent.length) { + System.arraycopy(arrCurrent, 0, arrValue, 0, arrCurrent.length); + } else { + if (arrValue.length != arrCurrent.length) { + throw new IOException("DeprecatedParquetHiveInput : size of object differs. Value" + + " size : " + arrValue.length + ", Current Object size : " + arrCurrent.length); + } else { + throw new IOException("DeprecatedParquetHiveInput can not support RecordReaders that" + + " don't return same key & value & value is null"); + } + } + } + return true; + } catch (final InterruptedException e) { + throw new IOException(e); + } + } + + /** + * gets a ParquetInputSplit corresponding to a split given by Hive + * + * @param oldSplit The split given by Hive + * @param conf The JobConf of the Hive job + * @return a ParquetInputSplit corresponding to the oldSplit + * @throws IOException if the config cannot be enhanced or if the footer cannot be read from the file + */ + @SuppressWarnings("deprecation") + protected ParquetInputSplit getSplit( + final InputSplit oldSplit, + final JobConf conf + ) throws IOException { + ParquetInputSplit split; + if (oldSplit instanceof FileSplit) { + final Path finalPath = ((FileSplit) oldSplit).getPath(); + + final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(jobConf, finalPath); + final List<BlockMetaData> blocks = parquetMetadata.getBlocks(); + final FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); + + final ReadContext readContext = new DataWritableReadSupport().init(new InitContext(jobConf, + null, fileMetaData.getSchema())); + + schemaSize = MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata() + .get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount(); + final List<BlockMetaData> splitGroup = new ArrayList<BlockMetaData>(); + final long splitStart = ((FileSplit) oldSplit).getStart(); + final long splitLength = ((FileSplit) oldSplit).getLength(); + for (final BlockMetaData block : blocks) { + final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); + if (firstDataPage >= splitStart && firstDataPage < splitStart + splitLength) { + splitGroup.add(block); + } + } + if (splitGroup.isEmpty()) { + LOG.warn("Skipping split, could not find row group in: " + (FileSplit) oldSplit); + return null; + } + + FilterCompat.Filter filter = setFilter(jobConf, fileMetaData.getSchema()); + if (filter != null) { + filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, fileMetaData.getSchema()); + if (filtedBlocks.isEmpty()) { + LOG.debug("All row groups are dropped due to filter predicates"); + return null; + } + + long droppedBlocks = splitGroup.size() - filtedBlocks.size(); + if (droppedBlocks > 0) { + LOG.debug("Dropping " + droppedBlocks + " row groups that do not pass filter predicate"); + } + } else { + filtedBlocks = splitGroup; + } + + split = new ParquetInputSplit(finalPath, + splitStart, + splitLength, + ((FileSplit) oldSplit).getLocations(), + filtedBlocks, + readContext.getRequestedSchema().toString(), + fileMetaData.getSchema().toString(), + fileMetaData.getKeyValueMetaData(), + readContext.getReadSupportMetadata()); + return split; + } else { + throw new IllegalArgumentException("Unknown split type: " + oldSplit); + } + } + + public List<BlockMetaData> getFiltedBlocks() { + return filtedBlocks; + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/Repeated.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/Repeated.java b/java/bench/src/java/org/apache/orc/bench/parquet/Repeated.java new file mode 100644 index 0000000..02f5351 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/parquet/Repeated.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.parquet; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Writable; +import org.apache.orc.TypeDescription; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.PrimitiveType; + +/** + * Converters for repeated fields need to know when the parent field starts and + * ends to correctly build lists from the repeated values. + */ +public interface Repeated extends ConverterParent { + + public void parentStart(); + + public void parentEnd(); + + abstract class RepeatedConverterParent extends PrimitiveConverter implements Repeated { + private Map<String, String> metadata; + + public void setMetadata(Map<String, String> metadata) { + this.metadata = metadata; + } + + public Map<String, String> getMetadata() { + return metadata; + } + } + + /** + * Stands in for a PrimitiveConverter and accumulates multiple values as an + * ArrayWritable. + */ + class RepeatedPrimitiveConverter extends RepeatedConverterParent { + private final PrimitiveType primitiveType; + private final PrimitiveConverter wrapped; + private final ConverterParent parent; + private final int index; + private final List<Writable> list = new ArrayList<Writable>(); + + public RepeatedPrimitiveConverter(PrimitiveType primitiveType, ConverterParent parent, int index, TypeDescription hiveTypeInfo) { + setMetadata(parent.getMetadata()); + this.primitiveType = primitiveType; + this.parent = parent; + this.index = index; + this.wrapped = HiveGroupConverter.getConverterFromDescription(primitiveType, 0, this, hiveTypeInfo); + } + + @Override + public boolean hasDictionarySupport() { + return wrapped.hasDictionarySupport(); + } + + @Override + public void setDictionary(Dictionary dictionary) { + wrapped.setDictionary(dictionary); + } + + @Override + public void addValueFromDictionary(int dictionaryId) { + wrapped.addValueFromDictionary(dictionaryId); + } + + @Override + public void addBinary(Binary value) { + wrapped.addBinary(value); + } + + @Override + public void addBoolean(boolean value) { + wrapped.addBoolean(value); + } + + @Override + public void addDouble(double value) { + wrapped.addDouble(value); + } + + @Override + public void addFloat(float value) { + wrapped.addFloat(value); + } + + @Override + public void addInt(int value) { + wrapped.addInt(value); + } + + @Override + public void addLong(long value) { + wrapped.addLong(value); + } + + @Override + public void parentStart() { + list.clear(); + } + + @Override + public void parentEnd() { + parent.set(index, new ArrayWritable( + Writable.class, list.toArray(new Writable[list.size()]))); + } + + @Override + public void set(int index, Writable value) { + list.add(value); + } + } + + /** + * Stands in for a HiveGroupConverter and accumulates multiple values as an + * ArrayWritable. + */ + class RepeatedGroupConverter extends HiveGroupConverter + implements Repeated { + private final GroupType groupType; + private final HiveGroupConverter wrapped; + private final ConverterParent parent; + private final int index; + private final List<Writable> list = new ArrayList<Writable>(); + private final Map<String, String> metadata = new HashMap<String, String>(); + + + public RepeatedGroupConverter(GroupType groupType, ConverterParent parent, int index, TypeDescription hiveTypeInfo) { + setMetadata(parent.getMetadata()); + this.groupType = groupType; + this.parent = parent; + this.index = index; + this.wrapped = HiveGroupConverter.getConverterFromDescription(groupType, 0, this, hiveTypeInfo); + } + + @Override + public void set(int fieldIndex, Writable value) { + list.add(value); + } + + @Override + public Converter getConverter(int fieldIndex) { + // delegate to the group's converters + return wrapped.getConverter(fieldIndex); + } + + @Override + public void start() { + wrapped.start(); + } + + @Override + public void end() { + wrapped.end(); + } + + @Override + public void parentStart() { + list.clear(); + } + + @Override + public void parentEnd() { + parent.set(index, new ArrayWritable( + Writable.class, list.toArray(new Writable[list.size()]))); + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/pom.xml ---------------------------------------------------------------------- diff --git a/java/pom.xml b/java/pom.xml index c34a4f4..7335b54 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -381,6 +381,11 @@ <version>1.1</version> </dependency> <dependency> + <groupId>org.jodd</groupId> + <artifactId>jodd-core</artifactId> + <version>3.5.2</version> + </dependency> + <dependency> <groupId>org.openjdk.jmh</groupId> <artifactId>jmh-core</artifactId> <version>1.12</version> http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/tools/src/java/org/apache/orc/tools/PrintData.java ---------------------------------------------------------------------- diff --git a/java/tools/src/java/org/apache/orc/tools/PrintData.java b/java/tools/src/java/org/apache/orc/tools/PrintData.java index ebd9ae1..5ed03a5 100644 --- a/java/tools/src/java/org/apache/orc/tools/PrintData.java +++ b/java/tools/src/java/org/apache/orc/tools/PrintData.java @@ -175,10 +175,10 @@ public class PrintData { } } - static void printRow(JSONWriter writer, - VectorizedRowBatch batch, - TypeDescription schema, - int row) throws JSONException { + public static void printRow(JSONWriter writer, + VectorizedRowBatch batch, + TypeDescription schema, + int row) throws JSONException { if (schema.getCategory() == TypeDescription.Category.STRUCT) { List<TypeDescription> fieldTypes = schema.getChildren(); List<String> fieldNames = schema.getFieldNames();