http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java new file mode 100644 index 0000000..87c8ac9 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter; +import org.apache.parquet.hadoop.util.HadoopCodecs; + +import java.util.Map; + +import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; +import static org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY; + +public class HadoopReadOptions extends ParquetReadOptions { + private final Configuration conf; + + private HadoopReadOptions(boolean useSignedStringMinMax, + boolean useStatsFilter, + boolean useDictionaryFilter, + boolean useRecordFilter, + FilterCompat.Filter recordFilter, + MetadataFilter metadataFilter, + CompressionCodecFactory codecFactory, + ByteBufferAllocator allocator, + Map<String, String> properties, + Configuration conf) { + super( + useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, recordFilter, + metadataFilter, codecFactory, allocator, properties + ); + this.conf = conf; + } + + @Override + public String getProperty(String property) { + String value = super.getProperty(property); + if (value != null) { + return value; + } + return conf.get(property); + } + + public Configuration getConf() { + return conf; + } + + public static Builder builder(Configuration conf) { + return new Builder(conf); + } + + public static class Builder extends ParquetReadOptions.Builder { + private final Configuration conf; + + public Builder(Configuration conf) { + this.conf = conf; + useSignedStringMinMax(conf.getBoolean("parquet.strings.signed-min-max.enabled", false)); + useDictionaryFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true)); + useStatsFilter(conf.getBoolean(DICTIONARY_FILTERING_ENABLED, true)); + useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true)); + withCodecFactory(HadoopCodecs.newFactory(conf, 0)); + withRecordFilter(getFilter(conf)); + String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY); + if (badRecordThresh != null) { + set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh); + } + } + + @Override + public ParquetReadOptions build() { + return new HadoopReadOptions( + useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, + recordFilter, metadataFilter, codecFactory, allocator, properties, conf); + } + } +}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java new file mode 100644 index 0000000..5f2f0a8 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet; + +import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.util.HadoopCodecs; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; + +// Internal use only +public class ParquetReadOptions { + private static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true; + private static final boolean STATS_FILTERING_ENABLED_DEFAULT = true; + private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true; + + private final boolean useSignedStringMinMax; + private final boolean useStatsFilter; + private final boolean useDictionaryFilter; + private final boolean useRecordFilter; + private final FilterCompat.Filter recordFilter; + private final ParquetMetadataConverter.MetadataFilter metadataFilter; + private final CompressionCodecFactory codecFactory; + private final ByteBufferAllocator allocator; + private final Map<String, String> properties; + + ParquetReadOptions(boolean useSignedStringMinMax, + boolean useStatsFilter, + boolean useDictionaryFilter, + boolean useRecordFilter, + FilterCompat.Filter recordFilter, + ParquetMetadataConverter.MetadataFilter metadataFilter, + CompressionCodecFactory codecFactory, + ByteBufferAllocator allocator, + Map<String, String> properties) { + this.useSignedStringMinMax = useSignedStringMinMax; + this.useStatsFilter = useStatsFilter; + this.useDictionaryFilter = useDictionaryFilter; + this.useRecordFilter = useRecordFilter; + this.recordFilter = recordFilter; + this.metadataFilter = metadataFilter; + this.codecFactory = codecFactory; + this.allocator = allocator; + this.properties = Collections.unmodifiableMap(properties); + } + + public boolean useSignedStringMinMax() { + return useSignedStringMinMax; + } + + public boolean useStatsFilter() { + return useStatsFilter; + } + + public boolean useDictionaryFilter() { + return useDictionaryFilter; + } + + public boolean useRecordFilter() { + return useRecordFilter; + } + + public FilterCompat.Filter getRecordFilter() { + return recordFilter; + } + + public ParquetMetadataConverter.MetadataFilter getMetadataFilter() { + return metadataFilter; + } + + public CompressionCodecFactory getCodecFactory() { + return codecFactory; + } + + public ByteBufferAllocator getAllocator() { + return allocator; + } + + public Set<String> getPropertyNames() { + return properties.keySet(); + } + + public String getProperty(String property) { + return properties.get(property); + } + + public boolean isEnabled(String property, boolean defaultValue) { + if (properties.containsKey(property)) { + return Boolean.valueOf(properties.get(property)); + } else { + return defaultValue; + } + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + boolean useSignedStringMinMax = false; + boolean useStatsFilter = STATS_FILTERING_ENABLED_DEFAULT; + boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT; + boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT; + FilterCompat.Filter recordFilter = null; + ParquetMetadataConverter.MetadataFilter metadataFilter = NO_FILTER; + // the page size parameter isn't used when only using the codec factory to get decompressors + CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0); + ByteBufferAllocator allocator = new HeapByteBufferAllocator(); + Map<String, String> properties = new HashMap<>(); + + public Builder useSignedStringMinMax(boolean useSignedStringMinMax) { + this.useSignedStringMinMax = useSignedStringMinMax; + return this; + } + + public Builder useSignedStringMinMax() { + this.useSignedStringMinMax = true; + return this; + } + + public Builder useStatsFilter(boolean useStatsFilter) { + this.useStatsFilter = useStatsFilter; + return this; + } + + public Builder useStatsFilter() { + this.useStatsFilter = true; + return this; + } + + public Builder useDictionaryFilter(boolean useDictionaryFilter) { + this.useDictionaryFilter = useDictionaryFilter; + return this; + } + + public Builder useDictionaryFilter() { + this.useDictionaryFilter = true; + return this; + } + + public Builder useRecordFilter(boolean useRecordFilter) { + this.useRecordFilter = useRecordFilter; + return this; + } + + public Builder useRecordFilter() { + this.useRecordFilter = true; + return this; + } + + public Builder withRecordFilter(FilterCompat.Filter rowGroupFilter) { + this.recordFilter = rowGroupFilter; + return this; + } + + public Builder withRange(long start, long end) { + this.metadataFilter = ParquetMetadataConverter.range(start, end); + return this; + } + + public Builder withOffsets(long... rowGroupOffsets) { + this.metadataFilter = ParquetMetadataConverter.offsets(rowGroupOffsets); + return this; + } + + public Builder withMetadataFilter(ParquetMetadataConverter.MetadataFilter metadataFilter) { + this.metadataFilter = metadataFilter; + return this; + } + + public Builder withCodecFactory(CompressionCodecFactory codecFactory) { + this.codecFactory = codecFactory; + return this; + } + + public Builder withAllocator(ByteBufferAllocator allocator) { + this.allocator = allocator; + return this; + } + + public Builder set(String key, String value) { + properties.put(key, value); + return this; + } + + public Builder copy(ParquetReadOptions options) { + useSignedStringMinMax(options.useSignedStringMinMax); + useStatsFilter(options.useStatsFilter); + useDictionaryFilter(options.useDictionaryFilter); + useRecordFilter(options.useRecordFilter); + withRecordFilter(options.recordFilter); + withMetadataFilter(options.metadataFilter); + withCodecFactory(options.codecFactory); + withAllocator(options.allocator); + for (Map.Entry<String, String> keyValue : options.properties.entrySet()) { + set(keyValue.getKey(), keyValue.getValue()); + } + return this; + } + + public ParquetReadOptions build() { + return new ParquetReadOptions( + useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, + recordFilter, metadataFilter, codecFactory, allocator, properties); + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java index fd74799..68c38ce 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java @@ -51,6 +51,10 @@ public class RowGroupFilter implements Visitor<List<BlockMetaData>> { DICTIONARY } + /** + * @deprecated will be removed in 2.0.0. + */ + @Deprecated public static List<BlockMetaData> filterRowGroups(Filter filter, List<BlockMetaData> blocks, MessageType schema) { checkNotNull(filter, "filter"); return filter.accept(new RowGroupFilter(blocks, schema)); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index aeb6152..bba7e62 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -38,6 +38,8 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.CorruptStatistics; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.format.CompressionCodec; import org.apache.parquet.format.PageEncodingStats; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.format.ColumnChunk; @@ -89,10 +91,18 @@ public class ParquetMetadataConverter { this(false); } + /** + * @deprecated will be removed in 2.0.0; use {@code ParquetMetadataConverter(ParquetReadOptions)} + */ + @Deprecated public ParquetMetadataConverter(Configuration conf) { this(conf.getBoolean("parquet.strings.signed-min-max.enabled", false)); } + public ParquetMetadataConverter(ParquetReadOptions options) { + this(options.useSignedStringMinMax()); + } + private ParquetMetadataConverter(boolean useSignedStringMinMax) { this.useSignedStringMinMax = useSignedStringMinMax; } @@ -193,7 +203,7 @@ public class ParquetMetadataConverter { getType(columnMetaData.getType()), toFormatEncodings(columnMetaData.getEncodings()), Arrays.asList(columnMetaData.getPath().toArray()), - columnMetaData.getCodec().getParquetCompressionCodec(), + toFormatCodec(columnMetaData.getCodec()), columnMetaData.getValueCount(), columnMetaData.getTotalUncompressedSize(), columnMetaData.getTotalSize(), @@ -246,6 +256,14 @@ public class ParquetMetadataConverter { return cached; } + private CompressionCodecName fromFormatCodec(CompressionCodec codec) { + return CompressionCodecName.valueOf(codec.toString()); + } + + private CompressionCodec toFormatCodec(CompressionCodecName codec) { + return CompressionCodec.valueOf(codec.toString()); + } + public org.apache.parquet.column.Encoding getEncoding(Encoding encoding) { return org.apache.parquet.column.Encoding.valueOf(encoding.name()); } @@ -820,7 +838,7 @@ public class ParquetMetadataConverter { ColumnChunkMetaData column = ColumnChunkMetaData.get( path, messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName(), - CompressionCodecName.fromParquet(metaData.codec), + fromFormatCodec(metaData.codec), convertEncodingStats(metaData.getEncoding_stats()), fromFormatEncodings(metaData.encodings), fromParquetStatistics( http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index 8bf882f..8befa79 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -36,9 +36,10 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.compression.CompressionCodecFactory; import org.apache.parquet.hadoop.metadata.CompressionCodecName; -public class CodecFactory { +public class CodecFactory implements CompressionCodecFactory { protected static final Map<String, CompressionCodec> CODEC_BY_NAME = Collections .synchronizedMap(new HashMap<String, CompressionCodec>()); @@ -118,7 +119,7 @@ public class CodecFactory { output.put(decompressed); } - protected void release() { + public void release() { if (decompressor != null) { CodecPool.returnDecompressor(decompressor); } @@ -171,7 +172,7 @@ public class CodecFactory { } @Override - protected void release() { + public void release() { if (compressor != null) { CodecPool.returnCompressor(compressor); } @@ -183,6 +184,7 @@ public class CodecFactory { } + @Override public BytesCompressor getCompressor(CompressionCodecName codecName) { BytesCompressor comp = compressors.get(codecName); if (comp == null) { @@ -192,6 +194,7 @@ public class CodecFactory { return comp; } + @Override public BytesDecompressor getDecompressor(CompressionCodecName codecName) { BytesDecompressor decomp = decompressors.get(codecName); if (decomp == null) { @@ -235,6 +238,7 @@ public class CodecFactory { } } + @Override public void release() { for (BytesCompressor compressor : compressors.values()) { compressor.release(); @@ -246,15 +250,23 @@ public class CodecFactory { decompressors.clear(); } - public static abstract class BytesCompressor { + /** + * @deprecated will be removed in 2.0.0; use CompressionCodecFactory.BytesInputCompressor instead. + */ + @Deprecated + public static abstract class BytesCompressor implements CompressionCodecFactory.BytesInputCompressor { public abstract BytesInput compress(BytesInput bytes) throws IOException; public abstract CompressionCodecName getCodecName(); - protected abstract void release(); + public abstract void release(); } - public static abstract class BytesDecompressor { + /** + * @deprecated will be removed in 2.0.0; use CompressionCodecFactory.BytesInputDecompressor instead. + */ + @Deprecated + public static abstract class BytesDecompressor implements CompressionCodecFactory.BytesInputDecompressor { public abstract BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException; public abstract void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) throws IOException; - protected abstract void release(); + public abstract void release(); } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java index f067679..37dfd6d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java @@ -33,6 +33,8 @@ import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.DictionaryPageReadStore; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor; import org.apache.parquet.io.ParquetDecodingException; import org.slf4j.Logger; @@ -56,12 +58,12 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore */ static final class ColumnChunkPageReader implements PageReader { - private final BytesDecompressor decompressor; + private final BytesInputDecompressor decompressor; private final long valueCount; private final List<DataPage> compressedPages; private final DictionaryPage compressedDictionaryPage; - ColumnChunkPageReader(BytesDecompressor decompressor, List<DataPage> compressedPages, DictionaryPage compressedDictionaryPage) { + ColumnChunkPageReader(BytesInputDecompressor decompressor, List<DataPage> compressedPages, DictionaryPage compressedDictionaryPage) { this.decompressor = decompressor; this.compressedPages = new LinkedList<DataPage>(compressedPages); this.compressedDictionaryPage = compressedDictionaryPage; http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java index 344f3ec..58e79ac 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java @@ -179,7 +179,7 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable { } @Override - protected void release() { + public void release() { DirectCodecPool.INSTANCE.returnDecompressor(decompressor); } } @@ -221,7 +221,7 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable { } @Override - protected void release() { + public void release() { DirectCodecPool.INSTANCE.returnDirectDecompressor(decompressor); extraDecompressor.release(); } @@ -245,7 +245,7 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable { } @Override - protected void release() {} + public void release() {} } @@ -269,7 +269,7 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable { } @Override - protected void release() {} + public void release() {} } public class SnappyCompressor extends BytesCompressor { @@ -311,7 +311,7 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable { } @Override - protected void release() { + public void release() { outgoing = DirectCodecFactory.this.release(outgoing); incoming = DirectCodecFactory.this.release(incoming); } @@ -333,7 +333,7 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable { } @Override - protected void release() {} + public void release() {} } static class DirectCodecPool { http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java index 88b3d2d..a048878 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java @@ -27,6 +27,8 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.filter2.compat.FilterCompat; @@ -47,7 +49,6 @@ import org.slf4j.LoggerFactory; import static java.lang.String.format; import static org.apache.parquet.Preconditions.checkNotNull; import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; -import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED_DEFAULT; import static org.apache.parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING; class InternalParquetRecordReader<T> { @@ -160,6 +161,34 @@ class InternalParquetRecordReader<T> { return (float) current / total; } + public void initialize(ParquetFileReader reader, ParquetReadOptions options) { + // copy custom configuration to the Configuration passed to the ReadSupport + Configuration conf = new Configuration(); + if (options instanceof HadoopReadOptions) { + conf = ((HadoopReadOptions) options).getConf(); + } + for (String property : options.getPropertyNames()) { + conf.set(property, options.getProperty(property)); + } + + // initialize a ReadContext for this file + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map<String, String> fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext(conf, toSetMultiMap(fileMetadata), fileSchema)); + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.requestedSchema = readContext.getRequestedSchema(); + this.columnCount = requestedSchema.getPaths().size(); + this.recordConverter = readSupport.prepareForRead(conf, fileMetadata, fileSchema, readContext); + this.strictTypeChecking = options.isEnabled(STRICT_TYPE_CHECKING, true); + this.total = reader.getRecordCount(); + this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(options, total); + this.filterRecords = options.useRecordFilter(); + reader.setRequestedSchema(requestedSchema); + LOG.info("RecordReader initialized will read a total of {} records.", total); + } + public void initialize(ParquetFileReader reader, Configuration configuration) throws IOException { // initialize a ReadContext for this file @@ -177,8 +206,7 @@ class InternalParquetRecordReader<T> { this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true); this.total = reader.getRecordCount(); this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total); - this.filterRecords = configuration.getBoolean( - RECORD_FILTERING_ENABLED, RECORD_FILTERING_ENABLED_DEFAULT); + this.filterRecords = configuration.getBoolean(RECORD_FILTERING_ENABLED, true); reader.setRequestedSchema(requestedSchema); LOG.info("RecordReader initialized will read a total of {} records.", total); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 1815bd6..1ace040 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -26,10 +26,6 @@ import static org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC; import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE; import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE; -import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED; -import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED_DEFAULT; -import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED; -import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED_DEFAULT; import java.io.Closeable; import java.io.IOException; @@ -51,17 +47,16 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.bytes.ByteBufferInputStream; -import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.page.DictionaryPageReadStore; +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.RowGroupFilter; @@ -80,15 +75,14 @@ import org.apache.parquet.format.PageHeader; import org.apache.parquet.format.Util; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter; -import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor; import org.apache.parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.hadoop.util.HiddenFileFilter; -import org.apache.parquet.hadoop.util.HadoopStreams; import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.hadoop.util.counters.BenchmarkCounter; import org.apache.parquet.io.ParquetDecodingException; @@ -119,6 +113,7 @@ public class ParquetFileReader implements Closeable { * @param partFiles the part files to read * @return the footers for those files using the summary file if possible. * @throws IOException + * @deprecated metadata files are not recommended and will be removed in 2.0.0 */ @Deprecated public static List<Footer> readAllFootersInParallelUsingSummaryFiles(Configuration configuration, List<FileStatus> partFiles) throws IOException { @@ -137,7 +132,9 @@ public class ParquetFileReader implements Closeable { * @param skipRowGroups to skipRowGroups in the footers * @return the footers for those files using the summary file if possible. * @throws IOException + * @deprecated metadata files are not recommended and will be removed in 2.0.0 */ + @Deprecated public static List<Footer> readAllFootersInParallelUsingSummaryFiles( final Configuration configuration, final Collection<FileStatus> partFiles, @@ -233,6 +230,9 @@ public class ParquetFileReader implements Closeable { } } + /** + * @deprecated metadata files are not recommended and will be removed in 2.0.0 + */ @Deprecated public static List<Footer> readAllFootersInParallel(final Configuration configuration, List<FileStatus> partFiles) throws IOException { return readAllFootersInParallel(configuration, partFiles, false); @@ -246,7 +246,10 @@ public class ParquetFileReader implements Closeable { * @param skipRowGroups to skip the rowGroup info * @return the footers * @throws IOException + * @deprecated will be removed in 2.0.0; + * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)} */ + @Deprecated public static List<Footer> readAllFootersInParallel(final Configuration configuration, List<FileStatus> partFiles, final boolean skipRowGroups) throws IOException { List<Callable<Footer>> footers = new ArrayList<Callable<Footer>>(); for (final FileStatus currentFile : partFiles) { @@ -271,7 +274,10 @@ public class ParquetFileReader implements Closeable { /** * Read the footers of all the files under that path (recursively) * not using summary files. + * @deprecated will be removed in 2.0.0; + * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)} */ + @Deprecated public static List<Footer> readAllFootersInParallel(Configuration configuration, FileStatus fileStatus, boolean skipRowGroups) throws IOException { List<FileStatus> statuses = listFiles(configuration, fileStatus); return readAllFootersInParallel(configuration, statuses, skipRowGroups); @@ -285,12 +291,18 @@ public class ParquetFileReader implements Closeable { * @param fileStatus the root dir * @return all the footers * @throws IOException + * @deprecated will be removed in 2.0.0; + * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)} */ + @Deprecated public static List<Footer> readAllFootersInParallel(Configuration configuration, FileStatus fileStatus) throws IOException { return readAllFootersInParallel(configuration, fileStatus, false); } - + /** + * @deprecated will be removed in 2.0.0; + * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)} + */ @Deprecated public static List<Footer> readFooters(Configuration configuration, Path path) throws IOException { return readFooters(configuration, status(configuration, path)); @@ -306,6 +318,8 @@ public class ParquetFileReader implements Closeable { * @param pathStatus * @return * @throws IOException + * @deprecated will be removed in 2.0.0; + * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)} */ @Deprecated public static List<Footer> readFooters(Configuration configuration, FileStatus pathStatus) throws IOException { @@ -319,7 +333,10 @@ public class ParquetFileReader implements Closeable { * @param pathStatus the root dir * @return all the footers * @throws IOException + * @deprecated will be removed in 2.0.0; + * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)} */ + @Deprecated public static List<Footer> readFooters(Configuration configuration, FileStatus pathStatus, boolean skipRowGroups) throws IOException { List<FileStatus> files = listFiles(configuration, pathStatus); return readAllFootersInParallelUsingSummaryFiles(configuration, files, skipRowGroups); @@ -345,7 +362,9 @@ public class ParquetFileReader implements Closeable { * @param summaryStatus * @return the metadata translated for each file * @throws IOException + * @deprecated metadata files are not recommended and will be removed in 2.0.0 */ + @Deprecated public static List<Footer> readSummaryFile(Configuration configuration, FileStatus summaryStatus) throws IOException { final Path parent = summaryStatus.getPath().getParent(); ParquetMetadata mergedFooters = readFooter(configuration, summaryStatus, filter(false)); @@ -394,6 +413,8 @@ public class ParquetFileReader implements Closeable { * @param file the parquet File * @return the metadata blocks in the footer * @throws IOException if an error occurs while reading the file + * @deprecated will be removed in 2.0.0; + * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)} */ @Deprecated public static final ParquetMetadata readFooter(Configuration configuration, Path file) throws IOException { @@ -408,13 +429,16 @@ public class ParquetFileReader implements Closeable { * @param filter the filter to apply to row groups * @return the metadata with row groups filtered. * @throws IOException if an error occurs while reading the file + * @deprecated will be removed in 2.0.0; + * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)} */ public static ParquetMetadata readFooter(Configuration configuration, Path file, MetadataFilter filter) throws IOException { return readFooter(HadoopInputFile.fromPath(file, configuration), filter); } /** - * @deprecated use {@link ParquetFileReader#readFooter(Configuration, FileStatus, MetadataFilter)} + * @deprecated will be removed in 2.0.0; + * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)} */ @Deprecated public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file) throws IOException { @@ -428,7 +452,10 @@ public class ParquetFileReader implements Closeable { * @param filter the filter to apply to row groups * @return the metadata blocks in the footer * @throws IOException if an error occurs while reading the file + * @deprecated will be removed in 2.0.0; + * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)} */ + @Deprecated public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file, MetadataFilter filter) throws IOException { return readFooter(HadoopInputFile.fromStatus(file, configuration), filter); } @@ -439,35 +466,32 @@ public class ParquetFileReader implements Closeable { * @param filter the filter to apply to row groups * @return the metadata blocks in the footer * @throws IOException if an error occurs while reading the file + * @deprecated will be removed in 2.0.0; + * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)} */ - public static final ParquetMetadata readFooter( - InputFile file, MetadataFilter filter) throws IOException { - ParquetMetadataConverter converter; - // TODO: remove this temporary work-around. - // this is necessary to pass the Configuration to ParquetMetadataConverter - // and should be removed when there is a non-Hadoop configuration. + @Deprecated + public static final ParquetMetadata readFooter(InputFile file, MetadataFilter filter) throws IOException { + ParquetReadOptions options; if (file instanceof HadoopInputFile) { - converter = new ParquetMetadataConverter( - ((HadoopInputFile) file).getConfiguration()); + options = HadoopReadOptions.builder(((HadoopInputFile) file).getConfiguration()) + .withMetadataFilter(filter).build(); } else { - converter = new ParquetMetadataConverter(); + options = ParquetReadOptions.builder().withMetadataFilter(filter).build(); } - try (SeekableInputStream in = file.newStream()) { - return readFooter(converter, file.getLength(), file.toString(), in, filter); + try (SeekableInputStream in = file.newStream()) { + return readFooter(file, options, in); } } - /** - * Reads the meta data block in the footer of the file using provided input stream - * @param fileLen length of the file - * @param filePath file location - * @param f input stream for the file - * @param filter the filter to apply to row groups - * @return the metadata blocks in the footer - * @throws IOException if an error occurs while reading the file - */ - private static final ParquetMetadata readFooter(ParquetMetadataConverter converter, long fileLen, String filePath, SeekableInputStream f, MetadataFilter filter) throws IOException { + private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, SeekableInputStream f) throws IOException { + ParquetMetadataConverter converter = new ParquetMetadataConverter(options); + return readFooter(file, options, f, converter); + } + + private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, SeekableInputStream f, ParquetMetadataConverter converter) throws IOException { + long fileLen = file.getLength(); + String filePath = file.toString(); LOG.debug("File length {}", fileLen); int FOOTER_LENGTH_SIZE = 4; if (fileLen < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC @@ -489,43 +513,75 @@ public class ParquetFileReader implements Closeable { throw new RuntimeException("corrupted file: the footer index is not within the file: " + footerIndex); } f.seek(footerIndex); - return converter.readParquetMetadata(f, filter); + return converter.readParquetMetadata(f, options.getMetadataFilter()); } + /** + * @deprecated will be removed in 2.0.0; use {@link #open(InputFile)} + */ + @Deprecated public static ParquetFileReader open(Configuration conf, Path file) throws IOException { - return new ParquetFileReader(conf, file); + return new ParquetFileReader(HadoopInputFile.fromPath(file, conf), + HadoopReadOptions.builder(conf).build()); } + /** + * @deprecated will be removed in 2.0.0; use {@link #open(InputFile,ParquetReadOptions)} + */ + @Deprecated public static ParquetFileReader open(Configuration conf, Path file, MetadataFilter filter) throws IOException { - return new ParquetFileReader(conf, file, filter); + return open(HadoopInputFile.fromPath(file, conf), + HadoopReadOptions.builder(conf).withMetadataFilter(filter).build()); } + /** + * @deprecated will be removed in 2.0.0 + */ + @Deprecated public static ParquetFileReader open(Configuration conf, Path file, ParquetMetadata footer) throws IOException { return new ParquetFileReader(conf, file, footer); } - private final CodecFactory codecFactory; + /** + * Open a {@link InputFile file}. + * + * @param file an input file + * @return an open ParquetFileReader + */ + public static ParquetFileReader open(InputFile file) throws IOException { + return new ParquetFileReader(file, ParquetReadOptions.builder().build()); + } + + /** + * Open a {@link InputFile file} with {@link ParquetReadOptions options}. + * + * @param file an input file + * @return an open ParquetFileReader + */ + public static ParquetFileReader open(InputFile file, ParquetReadOptions options) throws IOException { + return new ParquetFileReader(file, options); + } + + private final InputFile file; private final SeekableInputStream f; - private final FileStatus fileStatus; - private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<ColumnPath, ColumnDescriptor>(); + private final ParquetReadOptions options; + private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<>(); private final FileMetaData fileMetaData; // may be null - private final ByteBufferAllocator allocator; - private final Configuration conf; + private final List<BlockMetaData> blocks; // not final. in some cases, this may be lazily loaded for backward-compat. private ParquetMetadata footer; - // blocks can be filtered after they are read (or set in the constructor) - private List<BlockMetaData> blocks; private int currentBlock = 0; private ColumnChunkPageReadStore currentRowGroup = null; private DictionaryPageReader nextDictionaryReader = null; /** - * @deprecated use @link{ParquetFileReader(Configuration configuration, FileMetaData fileMetaData, - * Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> columns)} instead + * @deprecated use {@link ParquetFileReader(Configuration,FileMetaData,Path,List,List)} instead. */ - public ParquetFileReader(Configuration configuration, Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> columns) throws IOException { + @Deprecated + public ParquetFileReader(Configuration configuration, Path filePath, List<BlockMetaData> blocks, + List<ColumnDescriptor> columns) throws IOException { this(configuration, null, filePath, blocks, columns); } @@ -541,28 +597,14 @@ public class ParquetFileReader implements Closeable { Configuration configuration, FileMetaData fileMetaData, Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> columns) throws IOException { this.converter = new ParquetMetadataConverter(configuration); - this.conf = configuration; + this.file = HadoopInputFile.fromPath(filePath, configuration); this.fileMetaData = fileMetaData; - FileSystem fs = filePath.getFileSystem(configuration); - this.f = HadoopStreams.wrap(fs.open(filePath)); - this.fileStatus = fs.getFileStatus(filePath); - this.blocks = blocks; + this.f = file.newStream(); + this.options = HadoopReadOptions.builder(configuration).build(); + this.blocks = filterRowGroups(blocks); for (ColumnDescriptor col : columns) { paths.put(ColumnPath.get(col.getPath()), col); } - // the page size parameter isn't meaningful when only using - // the codec factory to get decompressors - this.codecFactory = new CodecFactory(configuration, 0); - this.allocator = new HeapByteBufferAllocator(); - } - - /** - * @param configuration the Hadoop Configuration - * @param file Path to a parquet file - * @throws IOException if the file can not be opened - */ - private ParquetFileReader(Configuration configuration, Path file) throws IOException { - this(configuration, file, NO_FILTER); } /** @@ -570,23 +612,13 @@ public class ParquetFileReader implements Closeable { * @param file Path to a parquet file * @param filter a {@link MetadataFilter} for selecting row groups * @throws IOException if the file can not be opened + * @deprecated will be removed in 2.0.0; + * use {@link ParquetFileReader(InputFile,MetadataFilter)} instead */ + @Deprecated public ParquetFileReader(Configuration conf, Path file, MetadataFilter filter) throws IOException { - this.converter = new ParquetMetadataConverter(conf); - this.conf = conf; - FileSystem fs = file.getFileSystem(conf); - this.fileStatus = fs.getFileStatus(file); - this.f = HadoopStreams.wrap(fs.open(file)); - this.footer = readFooter(converter, fileStatus.getLen(), fileStatus.getPath().toString(), f, filter); - this.fileMetaData = footer.getFileMetaData(); - this.blocks = footer.getBlocks(); - for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) { - paths.put(ColumnPath.get(col.getPath()), col); - } - // the page size parameter isn't meaningful when only using - // the codec factory to get decompressors - this.codecFactory = new CodecFactory(conf, 0); - this.allocator = new HeapByteBufferAllocator(); + this(HadoopInputFile.fromPath(file, conf), + HadoopReadOptions.builder(conf).withMetadataFilter(filter).build()); } /** @@ -595,29 +627,38 @@ public class ParquetFileReader implements Closeable { * @param footer a {@link ParquetMetadata} footer already read from the file * @throws IOException if the file can not be opened */ + @Deprecated public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer) throws IOException { this.converter = new ParquetMetadataConverter(conf); - this.conf = conf; - FileSystem fs = file.getFileSystem(conf); - this.fileStatus = fs.getFileStatus(file); - this.f = HadoopStreams.wrap(fs.open(file)); + this.file = HadoopInputFile.fromPath(file, conf); + this.f = this.file.newStream(); + this.options = HadoopReadOptions.builder(conf).build(); this.footer = footer; this.fileMetaData = footer.getFileMetaData(); - this.blocks = footer.getBlocks(); + this.blocks = filterRowGroups(footer.getBlocks()); + for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) { + paths.put(ColumnPath.get(col.getPath()), col); + } + } + + public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException { + this.converter = new ParquetMetadataConverter(options); + this.file = file; + this.f = file.newStream(); + this.options = options; + this.footer = readFooter(file, options, f, converter); + this.fileMetaData = footer.getFileMetaData(); + this.blocks = filterRowGroups(footer.getBlocks()); for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) { paths.put(ColumnPath.get(col.getPath()), col); } - // the page size parameter isn't meaningful when only using - // the codec factory to get decompressors - this.codecFactory = new CodecFactory(conf, 0); - this.allocator = new HeapByteBufferAllocator(); } public ParquetMetadata getFooter() { if (footer == null) { try { // don't read the row groups because this.blocks is always set - this.footer = readFooter(converter, fileStatus.getLen(), fileStatus.getPath().toString(), f, SKIP_ROW_GROUPS); + this.footer = readFooter(file, options, f, converter); } catch (IOException e) { throw new ParquetDecodingException("Unable to read file footer", e); } @@ -640,25 +681,36 @@ public class ParquetFileReader implements Closeable { return total; } + /** + * @deprecated will be removed in 2.0.0; use {@link #getFile()} instead + */ + @Deprecated public Path getPath() { - return fileStatus.getPath(); + return new Path(file.toString()); + } + + public String getFile() { + return file.toString(); } - void filterRowGroups(FilterCompat.Filter filter) throws IOException { + private List<BlockMetaData> filterRowGroups(List<BlockMetaData> blocks) throws IOException { // set up data filters based on configured levels - List<RowGroupFilter.FilterLevel> levels = new ArrayList<RowGroupFilter.FilterLevel>(); + List<RowGroupFilter.FilterLevel> levels = new ArrayList<>(); - if (conf.getBoolean( - STATS_FILTERING_ENABLED, STATS_FILTERING_ENABLED_DEFAULT)) { + if (options.useStatsFilter()) { levels.add(STATISTICS); } - if (conf.getBoolean( - DICTIONARY_FILTERING_ENABLED, DICTIONARY_FILTERING_ENABLED_DEFAULT)) { + if (options.useDictionaryFilter()) { levels.add(DICTIONARY); } - this.blocks = RowGroupFilter.filterRowGroups(levels, filter, blocks, this); + FilterCompat.Filter recordFilter = options.getRecordFilter(); + if (recordFilter != null) { + return RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, this); + } + + return blocks; } public List<BlockMetaData> getRowGroups() { @@ -785,7 +837,7 @@ public class ParquetFileReader implements Closeable { } DictionaryPage compressedPage = readCompressedDictionary(pageHeader, f); - BytesDecompressor decompressor = codecFactory.getDecompressor(meta.getCodec()); + BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(meta.getCodec()); return new DictionaryPage( decompressor.decompress(compressedPage.getBytes(), compressedPage.getUncompressedSize()), @@ -817,9 +869,7 @@ public class ParquetFileReader implements Closeable { f.close(); } } finally { - if (codecFactory != null) { - codecFactory.release(); - } + options.getCodecFactory().release(); } } @@ -929,7 +979,7 @@ public class ParquetFileReader implements Closeable { " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size() + " pages ending at file offset " + (descriptor.fileOffset + pos())); } - BytesDecompressor decompressor = codecFactory.getDecompressor(descriptor.metadata.getCodec()); + BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec()); return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage); } @@ -1077,7 +1127,7 @@ public class ParquetFileReader implements Closeable { f.seek(offset); // Allocate the bytebuffer based on whether the FS can support it. - ByteBuffer chunksByteBuffer = allocator.allocate(length); + ByteBuffer chunksByteBuffer = options.getAllocator().allocate(length); f.readFully(chunksByteBuffer); // report in a counter the data we just scanned http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 57500bf..da8635d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -23,7 +23,6 @@ import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE; import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT; import java.io.IOException; -import java.io.OutputStream; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; @@ -36,7 +35,6 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -59,9 +57,13 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.GlobalMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.hadoop.util.HadoopStreams; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.OutputFile; import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.io.ParquetEncodingException; +import org.apache.parquet.io.PositionOutputStream; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.TypeUtil; @@ -85,22 +87,6 @@ public class ParquetFileWriter { public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata"; public static final int CURRENT_VERSION = 1; - // need to supply a buffer size when setting block size. this is the default - // for hadoop 1 to present. copying it avoids loading DFSConfigKeys. - private static final int DFS_BUFFER_SIZE_DEFAULT = 4096; - - // visible for testing - static final Set<String> BLOCK_FS_SCHEMES = new HashSet<String>(); - static { - BLOCK_FS_SCHEMES.add("hdfs"); - BLOCK_FS_SCHEMES.add("webhdfs"); - BLOCK_FS_SCHEMES.add("viewfs"); - } - - private static boolean supportsBlockSize(FileSystem fs) { - return BLOCK_FS_SCHEMES.contains(fs.getUri().getScheme()); - } - // File creation modes public static enum Mode { CREATE, @@ -108,7 +94,7 @@ public class ParquetFileWriter { } private final MessageType schema; - private final FSDataOutputStream out; + private final PositionOutputStream out; private final AlignmentStrategy alignment; // file data @@ -193,11 +179,14 @@ public class ParquetFileWriter { * @param schema the schema of the data * @param file the file to write to * @throws IOException if the file can not be created + * @deprecated will be removed in 2.0.0; + * use {@link ParquetFileWriter(OutputFile,MessageType,Mode,long,long)} instead */ + @Deprecated public ParquetFileWriter(Configuration configuration, MessageType schema, Path file) throws IOException { - this(configuration, schema, file, Mode.CREATE, DEFAULT_BLOCK_SIZE, - MAX_PADDING_SIZE_DEFAULT); + this(HadoopOutputFile.fromPath(file, configuration), + schema, Mode.CREATE, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT); } /** @@ -206,11 +195,14 @@ public class ParquetFileWriter { * @param file the file to write to * @param mode file creation mode * @throws IOException if the file can not be created + * @deprecated will be removed in 2.0.0; + * use {@link ParquetFileWriter(OutputFile,MessageType,Mode,long,long)} instead */ + @Deprecated public ParquetFileWriter(Configuration configuration, MessageType schema, Path file, Mode mode) throws IOException { - this(configuration, schema, file, mode, DEFAULT_BLOCK_SIZE, - MAX_PADDING_SIZE_DEFAULT); + this(HadoopOutputFile.fromPath(file, configuration), + schema, mode, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT); } /** @@ -219,36 +211,54 @@ public class ParquetFileWriter { * @param file the file to write to * @param mode file creation mode * @param rowGroupSize the row group size + * @param maxPaddingSize the maximum padding * @throws IOException if the file can not be created + * @deprecated will be removed in 2.0.0; + * use {@link ParquetFileWriter(OutputFile,MessageType,Mode,long,long)} instead */ + @Deprecated public ParquetFileWriter(Configuration configuration, MessageType schema, Path file, Mode mode, long rowGroupSize, int maxPaddingSize) throws IOException { - TypeUtil.checkValidWriteSchema(schema); - this.schema = schema; - FileSystem fs = file.getFileSystem(configuration); - boolean overwriteFlag = (mode == Mode.OVERWRITE); + this(HadoopOutputFile.fromPath(file, configuration), + schema, mode, rowGroupSize, maxPaddingSize); + } - if (supportsBlockSize(fs)) { - // use the default block size, unless row group size is larger - long dfsBlockSize = Math.max(fs.getDefaultBlockSize(file), rowGroupSize); + /** + * @param file OutputFile to create or overwrite + * @param schema the schema of the data + * @param mode file creation mode + * @param rowGroupSize the row group size + * @param maxPaddingSize the maximum padding + * @throws IOException if the file can not be created + */ + public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, + long rowGroupSize, int maxPaddingSize) + throws IOException { + TypeUtil.checkValidWriteSchema(schema); - this.alignment = PaddingAlignment.get( - dfsBlockSize, rowGroupSize, maxPaddingSize); - this.out = fs.create(file, overwriteFlag, DFS_BUFFER_SIZE_DEFAULT, - fs.getDefaultReplication(file), dfsBlockSize); + this.schema = schema; + long blockSize = rowGroupSize; + if (file.supportsBlockSize()) { + blockSize = Math.max(file.defaultBlockSize(), rowGroupSize); + this.alignment = PaddingAlignment.get(blockSize, rowGroupSize, maxPaddingSize); } else { this.alignment = NoAlignment.get(rowGroupSize); - this.out = fs.create(file, overwriteFlag); + } + + if (mode == Mode.OVERWRITE) { + this.out = file.createOrOverwrite(blockSize); + } else { + this.out = file.create(blockSize); } this.encodingStatsBuilder = new EncodingStats.Builder(); } /** - * FOR TESTING ONLY. + * FOR TESTING ONLY. This supports testing block padding behavior on the local FS. * * @param configuration Hadoop configuration * @param schema the schema of the data @@ -263,11 +273,10 @@ public class ParquetFileWriter { this.schema = schema; this.alignment = PaddingAlignment.get( rowAndBlockSize, rowAndBlockSize, maxPaddingSize); - this.out = fs.create(file, true, DFS_BUFFER_SIZE_DEFAULT, - fs.getDefaultReplication(file), rowAndBlockSize); + this.out = HadoopStreams.wrap( + fs.create(file, true, 8192, fs.getDefaultReplication(file), rowAndBlockSize)); this.encodingStatsBuilder = new EncodingStats.Builder(); } - /** * start the file * @throws IOException @@ -490,10 +499,23 @@ public class ParquetFileWriter { currentBlock = null; } + /** + * @deprecated will be removed in 2.0.0; use {@link #appendFile(InputFile)} instead + */ + @Deprecated public void appendFile(Configuration conf, Path file) throws IOException { ParquetFileReader.open(conf, file).appendTo(this); } + public void appendFile(InputFile file) throws IOException { + ParquetFileReader.open(file).appendTo(this); + } + + /** + * @deprecated will be removed in 2.0.0; + * use {@link #appendRowGroups(SeekableInputStream,List,boolean)} instead + */ + @Deprecated public void appendRowGroups(FSDataInputStream file, List<BlockMetaData> rowGroups, boolean dropColumns) throws IOException { @@ -508,13 +530,18 @@ public class ParquetFileWriter { } } + /** + * @deprecated will be removed in 2.0.0; + * use {@link #appendRowGroup(SeekableInputStream,BlockMetaData,boolean)} instead + */ + @Deprecated public void appendRowGroup(FSDataInputStream from, BlockMetaData rowGroup, boolean dropColumns) throws IOException { - appendRowGroup(from, rowGroup, dropColumns); + appendRowGroup(HadoopStreams.wrap(from), rowGroup, dropColumns); } public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup, - boolean dropColumns) throws IOException { + boolean dropColumns) throws IOException { startBlock(rowGroup.getRowCount()); Map<String, ColumnChunkMetaData> columnsToCopy = @@ -603,13 +630,13 @@ public class ParquetFileWriter { /** * Copy from a FS input stream to an output stream. Thread-safe * - * @param from a {@link FSDataInputStream} - * @param to any {@link OutputStream} + * @param from a {@link SeekableInputStream} + * @param to any {@link PositionOutputStream} * @param start where in the from stream to start copying * @param length the number of bytes to copy * @throws IOException */ - private static void copy(SeekableInputStream from, FSDataOutputStream to, + private static void copy(SeekableInputStream from, PositionOutputStream to, long start, long length) throws IOException{ LOG.debug("Copying {} bytes at {} to {}" ,length , start , to.getPos()); from.seek(start); @@ -642,7 +669,7 @@ public class ParquetFileWriter { out.close(); } - private static void serializeFooter(ParquetMetadata footer, FSDataOutputStream out) throws IOException { + private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out) throws IOException { long footerIndex = out.getPos(); org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer); writeFileMetaData(parquetMetadata, out); @@ -654,7 +681,9 @@ public class ParquetFileWriter { /** * Given a list of metadata files, merge them into a single ParquetMetadata * Requires that the schemas be compatible, and the extraMetadata be exactly equal. + * @deprecated metadata files are not recommended and will be removed in 2.0.0 */ + @Deprecated public static ParquetMetadata mergeMetadataFiles(List<Path> files, Configuration conf) throws IOException { Preconditions.checkArgument(!files.isEmpty(), "Cannot merge an empty list of metadata"); @@ -677,7 +706,9 @@ public class ParquetFileWriter { * Requires that the schemas be compatible, and the extraMetaData be exactly equal. * This is useful when merging 2 directories of parquet files into a single directory, as long * as both directories were written with compatible schemas and equal extraMetaData. + * @deprecated metadata files are not recommended and will be removed in 2.0.0 */ + @Deprecated public static void writeMergedMetadataFile(List<Path> files, Path outputPath, Configuration conf) throws IOException { ParquetMetadata merged = mergeMetadataFiles(files, conf); writeMetadataFile(outputPath, merged, outputPath.getFileSystem(conf)); @@ -688,8 +719,8 @@ public class ParquetFileWriter { * @param configuration the configuration to use to get the FileSystem * @param outputPath the directory to write the _metadata file to * @param footers the list of footers to merge - * @deprecated use the variant of writeMetadataFile that takes a {@link JobSummaryLevel} as an argument. * @throws IOException + * @deprecated metadata files are not recommended and will be removed in 2.0.0 */ @Deprecated public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException { @@ -698,7 +729,9 @@ public class ParquetFileWriter { /** * writes _common_metadata file, and optionally a _metadata file depending on the {@link JobSummaryLevel} provided + * @deprecated metadata files are not recommended and will be removed in 2.0.0 */ + @Deprecated public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers, JobSummaryLevel level) throws IOException { Preconditions.checkArgument(level == JobSummaryLevel.ALL || level == JobSummaryLevel.COMMON_ONLY, "Unsupported level: " + level); @@ -715,15 +748,23 @@ public class ParquetFileWriter { writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_COMMON_METADATA_FILE); } + /** + * @deprecated metadata files are not recommended and will be removed in 2.0.0 + */ + @Deprecated private static void writeMetadataFile(Path outputPathRoot, ParquetMetadata metadataFooter, FileSystem fs, String parquetMetadataFile) throws IOException { Path metaDataPath = new Path(outputPathRoot, parquetMetadataFile); writeMetadataFile(metaDataPath, metadataFooter, fs); } + /** + * @deprecated metadata files are not recommended and will be removed in 2.0.0 + */ + @Deprecated private static void writeMetadataFile(Path outputPath, ParquetMetadata metadataFooter, FileSystem fs) throws IOException { - FSDataOutputStream metadata = fs.create(outputPath); + PositionOutputStream metadata = HadoopStreams.wrap(fs.create(outputPath)); metadata.write(MAGIC); serializeFooter(metadataFooter, metadata); metadata.close(); @@ -850,9 +891,9 @@ public class ParquetFileWriter { } private interface AlignmentStrategy { - void alignForRowGroup(FSDataOutputStream out) throws IOException; + void alignForRowGroup(PositionOutputStream out) throws IOException; - long nextRowGroupSize(FSDataOutputStream out) throws IOException; + long nextRowGroupSize(PositionOutputStream out) throws IOException; } private static class NoAlignment implements AlignmentStrategy { @@ -867,11 +908,11 @@ public class ParquetFileWriter { } @Override - public void alignForRowGroup(FSDataOutputStream out) { + public void alignForRowGroup(PositionOutputStream out) { } @Override - public long nextRowGroupSize(FSDataOutputStream out) { + public long nextRowGroupSize(PositionOutputStream out) { return rowGroupSize; } } @@ -900,7 +941,7 @@ public class ParquetFileWriter { } @Override - public void alignForRowGroup(FSDataOutputStream out) throws IOException { + public void alignForRowGroup(PositionOutputStream out) throws IOException { long remaining = dfsBlockSize - (out.getPos() % dfsBlockSize); if (isPaddingNeeded(remaining)) { @@ -912,7 +953,7 @@ public class ParquetFileWriter { } @Override - public long nextRowGroupSize(FSDataOutputStream out) throws IOException { + public long nextRowGroupSize(PositionOutputStream out) throws IOException { if (maxPaddingSize <= 0) { return rowGroupSize; } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java index 7c5b5be..979388d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java @@ -120,19 +120,16 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> { * key to configure whether record-level filtering is enabled */ public static final String RECORD_FILTERING_ENABLED = "parquet.filter.record-level.enabled"; - static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true; /** * key to configure whether row group stats filtering is enabled */ public static final String STATS_FILTERING_ENABLED = "parquet.filter.stats.enabled"; - static final boolean STATS_FILTERING_ENABLED_DEFAULT = true; /** * key to configure whether row group dictionary filtering is enabled */ public static final String DICTIONARY_FILTERING_ENABLED = "parquet.filter.dictionary.enabled"; - static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = false; /** * key to turn on or off task side metadata loading (default true) http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 78af765..340ec11 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -42,6 +42,7 @@ import org.apache.parquet.hadoop.api.WriteSupport.WriteContext; import org.apache.parquet.hadoop.codec.CodecConfig; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.ConfigurationUtil; +import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -383,8 +384,8 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> { } WriteContext init = writeSupport.init(conf); - ParquetFileWriter w = new ParquetFileWriter( - conf, init.getSchema(), file, Mode.CREATE, blockSize, maxPaddingSize); + ParquetFileWriter w = new ParquetFileWriter(HadoopOutputFile.fromPath(file, conf), + init.getSchema(), Mode.CREATE, blockSize, maxPaddingSize); w.start(); float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO, http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java index ff9c811..1ba5380 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java @@ -22,7 +22,8 @@ import static org.apache.parquet.Preconditions.checkNotNull; import java.io.Closeable; import java.io.IOException; -import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -31,12 +32,17 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.Preconditions; +import org.apache.parquet.compression.CompressionCodecFactory; import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.FilterCompat.Filter; import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.hadoop.util.HiddenFileFilter; +import org.apache.parquet.io.InputFile; /** * Read records from a Parquet file. @@ -45,9 +51,8 @@ import org.apache.parquet.hadoop.util.HiddenFileFilter; public class ParquetReader<T> implements Closeable { private final ReadSupport<T> readSupport; - private final Configuration conf; - private final Iterator<Footer> footersIterator; - private final Filter filter; + private final Iterator<InputFile> filesIterator; + private final ParquetReadOptions options; private InternalParquetRecordReader<T> reader; @@ -100,17 +105,22 @@ public class ParquetReader<T> implements Closeable { } private ParquetReader(Configuration conf, - Path file, - ReadSupport<T> readSupport, - Filter filter) throws IOException { - this.readSupport = readSupport; - this.filter = checkNotNull(filter, "filter"); - this.conf = conf; + Path file, + ReadSupport<T> readSupport, + FilterCompat.Filter filter) throws IOException { + this(Collections.singletonList((InputFile) HadoopInputFile.fromPath(file, conf)), + HadoopReadOptions.builder(conf) + .withRecordFilter(checkNotNull(filter, "filter")) + .build(), + readSupport); + } - FileSystem fs = file.getFileSystem(conf); - List<FileStatus> statuses = Arrays.asList(fs.listStatus(file, HiddenFileFilter.INSTANCE)); - List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses, false); - this.footersIterator = footers.iterator(); + private ParquetReader(List<InputFile> files, + ParquetReadOptions options, + ReadSupport<T> readSupport) throws IOException { + this.readSupport = readSupport; + this.options = options; + this.filesIterator = files.iterator(); } /** @@ -135,18 +145,15 @@ public class ParquetReader<T> implements Closeable { reader.close(); reader = null; } - if (footersIterator.hasNext()) { - Footer footer = footersIterator.next(); - ParquetFileReader fileReader = ParquetFileReader.open( - conf, footer.getFile(), footer.getParquetMetadata()); + if (filesIterator.hasNext()) { + InputFile file = filesIterator.next(); - // apply data filters - fileReader.filterRowGroups(filter); + ParquetFileReader fileReader = ParquetFileReader.open(file, options); - reader = new InternalParquetRecordReader<T>(readSupport, filter); + reader = new InternalParquetRecordReader<>(readSupport, options.getRecordFilter()); - reader.initialize(fileReader, conf); + reader.initialize(fileReader, options); } } @@ -157,37 +164,114 @@ public class ParquetReader<T> implements Closeable { } } + public static <T> Builder<T> read(InputFile file) throws IOException { + return new Builder<>(file); + } + public static <T> Builder<T> builder(ReadSupport<T> readSupport, Path path) { - return new Builder<T>(readSupport, path); + return new Builder<>(readSupport, path); } public static class Builder<T> { private final ReadSupport<T> readSupport; - private final Path file; - private Filter filter; - protected Configuration conf; + private final InputFile file; + private final Path path; + private Filter filter = null; + protected Configuration conf = new Configuration(); + private ParquetReadOptions.Builder optionsBuilder = HadoopReadOptions.builder(conf); + @Deprecated private Builder(ReadSupport<T> readSupport, Path path) { this.readSupport = checkNotNull(readSupport, "readSupport"); - this.file = checkNotNull(path, "path"); - this.conf = new Configuration(); - this.filter = FilterCompat.NOOP; + this.file = null; + this.path = checkNotNull(path, "path"); } + @Deprecated protected Builder(Path path) { this.readSupport = null; - this.file = checkNotNull(path, "path"); - this.conf = new Configuration(); - this.filter = FilterCompat.NOOP; + this.file = null; + this.path = checkNotNull(path, "path"); } + protected Builder(InputFile file) { + this.readSupport = null; + this.file = checkNotNull(file, "file"); + this.path = null; + } + + // when called, resets options to the defaults from conf public Builder<T> withConf(Configuration conf) { this.conf = checkNotNull(conf, "conf"); + + // previous versions didn't use the builder, so may set filter before conf. this maintains + // compatibility for filter. other options are reset by a new conf. + this.optionsBuilder = HadoopReadOptions.builder(conf); + if (filter != null) { + optionsBuilder.withRecordFilter(filter); + } + return this; } public Builder<T> withFilter(Filter filter) { - this.filter = checkNotNull(filter, "filter"); + this.filter = filter; + optionsBuilder.withRecordFilter(filter); + return this; + } + + public Builder<T> useSignedStringMinMax(boolean useSignedStringMinMax) { + optionsBuilder.useSignedStringMinMax(useSignedStringMinMax); + return this; + } + + public Builder<T> useSignedStringMinMax() { + optionsBuilder.useSignedStringMinMax(); + return this; + } + + public Builder<T> useStatsFilter(boolean useStatsFilter) { + optionsBuilder.useStatsFilter(useStatsFilter); + return this; + } + + public Builder<T> useStatsFilter() { + optionsBuilder.useStatsFilter(); + return this; + } + + public Builder<T> useDictionaryFilter(boolean useDictionaryFilter) { + optionsBuilder.useDictionaryFilter(useDictionaryFilter); + return this; + } + + public Builder<T> useDictionaryFilter() { + optionsBuilder.useDictionaryFilter(); + return this; + } + + public Builder<T> useRecordFilter(boolean useRecordFilter) { + optionsBuilder.useRecordFilter(useRecordFilter); + return this; + } + + public Builder<T> useRecordFilter() { + optionsBuilder.useRecordFilter(); + return this; + } + + public Builder<T> withFileRange(long start, long end) { + optionsBuilder.withRange(start, end); + return this; + } + + public Builder<T> withCodecFactory(CompressionCodecFactory codecFactory) { + optionsBuilder.withCodecFactory(codecFactory); + return this; + } + + public Builder<T> set(String key, String value) { + optionsBuilder.set(key, value); return this; } @@ -199,7 +283,29 @@ public class ParquetReader<T> implements Closeable { } public ParquetReader<T> build() throws IOException { - return new ParquetReader<T>(conf, file, getReadSupport(), filter); + ParquetReadOptions options = optionsBuilder.build(); + + if (path != null) { + FileSystem fs = path.getFileSystem(conf); + FileStatus stat = fs.getFileStatus(path); + + if (stat.isFile()) { + return new ParquetReader<>( + Collections.singletonList((InputFile) HadoopInputFile.fromStatus(stat, conf)), + options, + getReadSupport()); + + } else { + List<InputFile> files = new ArrayList<>(); + for (FileStatus fileStatus : fs.listStatus(path, HiddenFileFilter.INSTANCE)) { + files.add(HadoopInputFile.fromStatus(fileStatus, conf)); + } + return new ParquetReader<T>(files, options, getReadSupport()); + } + + } else { + return new ParquetReader<>(Collections.singletonList(file), options, getReadSupport()); + } } } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java index ebdc686..9ca8be9 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java @@ -18,14 +18,9 @@ */ package org.apache.parquet.hadoop; -import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.*; -import static org.apache.parquet.format.converter.ParquetMetadataConverter.offsets; -import static org.apache.parquet.format.converter.ParquetMetadataConverter.range; import static org.apache.parquet.hadoop.ParquetInputFormat.SPLIT_FILES; -import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -37,21 +32,21 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.parquet.CorruptDeltaByteArrays; +import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.column.Encoding; import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.FilterCompat.Filter; -import org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel; -import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter; import org.apache.parquet.hadoop.api.ReadSupport; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.util.ContextUtil; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.hadoop.util.counters.BenchmarkCounter; import org.apache.parquet.io.ParquetDecodingException; import org.slf4j.Logger; @@ -158,13 +153,16 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> { long[] rowGroupOffsets = split.getRowGroupOffsets(); // if task.side.metadata is set, rowGroupOffsets is null - MetadataFilter metadataFilter = (rowGroupOffsets != null ? - offsets(rowGroupOffsets) : - range(split.getStart(), split.getEnd())); + ParquetReadOptions.Builder optionsBuilder = HadoopReadOptions.builder(configuration); + if (rowGroupOffsets != null) { + optionsBuilder.withOffsets(rowGroupOffsets); + } else { + optionsBuilder.withRange(split.getStart(), split.getEnd()); + } // open a reader with the metadata filter ParquetFileReader reader = ParquetFileReader.open( - configuration, path, metadataFilter); + HadoopInputFile.fromPath(path, configuration), optionsBuilder.build()); if (rowGroupOffsets != null) { // verify a row group was found for each offset @@ -175,10 +173,6 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> { + " expected: " + Arrays.toString(rowGroupOffsets) + " found: " + blocks); } - - } else { - // apply data filters - reader.filterRowGroups(getFilter(configuration)); } if (!reader.getRowGroups().isEmpty()) {