http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
new file mode 100644
index 0000000..87c8ac9
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
@@ -0,0 +1,98 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
+
+import java.util.Map;
+
+import static 
org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
+import static 
org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
+import static 
org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED;
+import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
+import static 
org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY;
+
+public class HadoopReadOptions extends ParquetReadOptions {
+  private final Configuration conf;
+
+  private HadoopReadOptions(boolean useSignedStringMinMax,
+                            boolean useStatsFilter,
+                            boolean useDictionaryFilter,
+                            boolean useRecordFilter,
+                            FilterCompat.Filter recordFilter,
+                            MetadataFilter metadataFilter,
+                            CompressionCodecFactory codecFactory,
+                            ByteBufferAllocator allocator,
+                            Map<String, String> properties,
+                            Configuration conf) {
+    super(
+        useSignedStringMinMax, useStatsFilter, useDictionaryFilter, 
useRecordFilter, recordFilter,
+        metadataFilter, codecFactory, allocator, properties
+    );
+    this.conf = conf;
+  }
+
+  @Override
+  public String getProperty(String property) {
+    String value = super.getProperty(property);
+    if (value != null) {
+      return value;
+    }
+    return conf.get(property);
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public static Builder builder(Configuration conf) {
+    return new Builder(conf);
+  }
+
+  public static class Builder extends ParquetReadOptions.Builder {
+    private final Configuration conf;
+
+    public Builder(Configuration conf) {
+      this.conf = conf;
+      
useSignedStringMinMax(conf.getBoolean("parquet.strings.signed-min-max.enabled", 
false));
+      useDictionaryFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true));
+      useStatsFilter(conf.getBoolean(DICTIONARY_FILTERING_ENABLED, true));
+      useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true));
+      withCodecFactory(HadoopCodecs.newFactory(conf, 0));
+      withRecordFilter(getFilter(conf));
+      String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY);
+      if (badRecordThresh != null) {
+        set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh);
+      }
+    }
+
+    @Override
+    public ParquetReadOptions build() {
+      return new HadoopReadOptions(
+          useSignedStringMinMax, useStatsFilter, useDictionaryFilter, 
useRecordFilter,
+          recordFilter, metadataFilter, codecFactory, allocator, properties, 
conf);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
new file mode 100644
index 0000000..5f2f0a8
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
@@ -0,0 +1,232 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet;
+
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+
+// Internal use only
+public class ParquetReadOptions {
+  private static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true;
+  private static final boolean STATS_FILTERING_ENABLED_DEFAULT = true;
+  private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true;
+
+  private final boolean useSignedStringMinMax;
+  private final boolean useStatsFilter;
+  private final boolean useDictionaryFilter;
+  private final boolean useRecordFilter;
+  private final FilterCompat.Filter recordFilter;
+  private final ParquetMetadataConverter.MetadataFilter metadataFilter;
+  private final CompressionCodecFactory codecFactory;
+  private final ByteBufferAllocator allocator;
+  private final Map<String, String> properties;
+
+  ParquetReadOptions(boolean useSignedStringMinMax,
+                             boolean useStatsFilter,
+                             boolean useDictionaryFilter,
+                             boolean useRecordFilter,
+                             FilterCompat.Filter recordFilter,
+                             ParquetMetadataConverter.MetadataFilter 
metadataFilter,
+                             CompressionCodecFactory codecFactory,
+                             ByteBufferAllocator allocator,
+                             Map<String, String> properties) {
+    this.useSignedStringMinMax = useSignedStringMinMax;
+    this.useStatsFilter = useStatsFilter;
+    this.useDictionaryFilter = useDictionaryFilter;
+    this.useRecordFilter = useRecordFilter;
+    this.recordFilter = recordFilter;
+    this.metadataFilter = metadataFilter;
+    this.codecFactory = codecFactory;
+    this.allocator = allocator;
+    this.properties = Collections.unmodifiableMap(properties);
+  }
+
+  public boolean useSignedStringMinMax() {
+    return useSignedStringMinMax;
+  }
+
+  public boolean useStatsFilter() {
+    return useStatsFilter;
+  }
+
+  public boolean useDictionaryFilter() {
+    return useDictionaryFilter;
+  }
+
+  public boolean useRecordFilter() {
+    return useRecordFilter;
+  }
+
+  public FilterCompat.Filter getRecordFilter() {
+    return recordFilter;
+  }
+
+  public ParquetMetadataConverter.MetadataFilter getMetadataFilter() {
+    return metadataFilter;
+  }
+
+  public CompressionCodecFactory getCodecFactory() {
+    return codecFactory;
+  }
+
+  public ByteBufferAllocator getAllocator() {
+    return allocator;
+  }
+
+  public Set<String> getPropertyNames() {
+    return properties.keySet();
+  }
+
+  public String getProperty(String property) {
+    return properties.get(property);
+  }
+
+  public boolean isEnabled(String property, boolean defaultValue) {
+    if (properties.containsKey(property)) {
+      return Boolean.valueOf(properties.get(property));
+    } else {
+      return defaultValue;
+    }
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    boolean useSignedStringMinMax = false;
+    boolean useStatsFilter = STATS_FILTERING_ENABLED_DEFAULT;
+    boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT;
+    boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT;
+    FilterCompat.Filter recordFilter = null;
+    ParquetMetadataConverter.MetadataFilter metadataFilter = NO_FILTER;
+    // the page size parameter isn't used when only using the codec factory to 
get decompressors
+    CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
+    ByteBufferAllocator allocator = new HeapByteBufferAllocator();
+    Map<String, String> properties = new HashMap<>();
+
+    public Builder useSignedStringMinMax(boolean useSignedStringMinMax) {
+      this.useSignedStringMinMax = useSignedStringMinMax;
+      return this;
+    }
+
+    public Builder useSignedStringMinMax() {
+      this.useSignedStringMinMax = true;
+      return this;
+    }
+
+    public Builder useStatsFilter(boolean useStatsFilter) {
+      this.useStatsFilter = useStatsFilter;
+      return this;
+    }
+
+    public Builder useStatsFilter() {
+      this.useStatsFilter = true;
+      return this;
+    }
+
+    public Builder useDictionaryFilter(boolean useDictionaryFilter) {
+      this.useDictionaryFilter = useDictionaryFilter;
+      return this;
+    }
+
+    public Builder useDictionaryFilter() {
+      this.useDictionaryFilter = true;
+      return this;
+    }
+
+    public Builder useRecordFilter(boolean useRecordFilter) {
+      this.useRecordFilter = useRecordFilter;
+      return this;
+    }
+
+    public Builder useRecordFilter() {
+      this.useRecordFilter = true;
+      return this;
+    }
+
+    public Builder withRecordFilter(FilterCompat.Filter rowGroupFilter) {
+      this.recordFilter = rowGroupFilter;
+      return this;
+    }
+
+    public Builder withRange(long start, long end) {
+      this.metadataFilter = ParquetMetadataConverter.range(start, end);
+      return this;
+    }
+
+    public Builder withOffsets(long... rowGroupOffsets) {
+      this.metadataFilter = ParquetMetadataConverter.offsets(rowGroupOffsets);
+      return this;
+    }
+
+    public Builder withMetadataFilter(ParquetMetadataConverter.MetadataFilter 
metadataFilter) {
+      this.metadataFilter = metadataFilter;
+      return this;
+    }
+
+    public Builder withCodecFactory(CompressionCodecFactory codecFactory) {
+      this.codecFactory = codecFactory;
+      return this;
+    }
+
+    public Builder withAllocator(ByteBufferAllocator allocator) {
+      this.allocator = allocator;
+      return this;
+    }
+
+    public Builder set(String key, String value) {
+      properties.put(key, value);
+      return this;
+    }
+
+    public Builder copy(ParquetReadOptions options) {
+      useSignedStringMinMax(options.useSignedStringMinMax);
+      useStatsFilter(options.useStatsFilter);
+      useDictionaryFilter(options.useDictionaryFilter);
+      useRecordFilter(options.useRecordFilter);
+      withRecordFilter(options.recordFilter);
+      withMetadataFilter(options.metadataFilter);
+      withCodecFactory(options.codecFactory);
+      withAllocator(options.allocator);
+      for (Map.Entry<String, String> keyValue : options.properties.entrySet()) 
{
+        set(keyValue.getKey(), keyValue.getValue());
+      }
+      return this;
+    }
+
+    public ParquetReadOptions build() {
+      return new ParquetReadOptions(
+          useSignedStringMinMax, useStatsFilter, useDictionaryFilter, 
useRecordFilter,
+          recordFilter, metadataFilter, codecFactory, allocator, properties);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
index fd74799..68c38ce 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
@@ -51,6 +51,10 @@ public class RowGroupFilter implements 
Visitor<List<BlockMetaData>> {
     DICTIONARY
   }
 
+  /**
+   * @deprecated will be removed in 2.0.0.
+   */
+  @Deprecated
   public static List<BlockMetaData> filterRowGroups(Filter filter, 
List<BlockMetaData> blocks, MessageType schema) {
     checkNotNull(filter, "filter");
     return filter.accept(new RowGroupFilter(blocks, schema));

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index aeb6152..bba7e62 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -38,6 +38,8 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.CorruptStatistics;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.format.CompressionCodec;
 import org.apache.parquet.format.PageEncodingStats;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.format.ColumnChunk;
@@ -89,10 +91,18 @@ public class ParquetMetadataConverter {
     this(false);
   }
 
+  /**
+   * @deprecated will be removed in 2.0.0; use {@code 
ParquetMetadataConverter(ParquetReadOptions)}
+   */
+  @Deprecated
   public ParquetMetadataConverter(Configuration conf) {
     this(conf.getBoolean("parquet.strings.signed-min-max.enabled", false));
   }
 
+  public ParquetMetadataConverter(ParquetReadOptions options) {
+    this(options.useSignedStringMinMax());
+  }
+
   private ParquetMetadataConverter(boolean useSignedStringMinMax) {
     this.useSignedStringMinMax = useSignedStringMinMax;
   }
@@ -193,7 +203,7 @@ public class ParquetMetadataConverter {
           getType(columnMetaData.getType()),
           toFormatEncodings(columnMetaData.getEncodings()),
           Arrays.asList(columnMetaData.getPath().toArray()),
-          columnMetaData.getCodec().getParquetCompressionCodec(),
+          toFormatCodec(columnMetaData.getCodec()),
           columnMetaData.getValueCount(),
           columnMetaData.getTotalUncompressedSize(),
           columnMetaData.getTotalSize(),
@@ -246,6 +256,14 @@ public class ParquetMetadataConverter {
     return cached;
   }
 
+  private CompressionCodecName fromFormatCodec(CompressionCodec codec) {
+    return CompressionCodecName.valueOf(codec.toString());
+  }
+
+  private CompressionCodec toFormatCodec(CompressionCodecName codec) {
+    return CompressionCodec.valueOf(codec.toString());
+  }
+
   public org.apache.parquet.column.Encoding getEncoding(Encoding encoding) {
     return org.apache.parquet.column.Encoding.valueOf(encoding.name());
   }
@@ -820,7 +838,7 @@ public class ParquetMetadataConverter {
           ColumnChunkMetaData column = ColumnChunkMetaData.get(
               path,
               
messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName(),
-              CompressionCodecName.fromParquet(metaData.codec),
+              fromFormatCodec(metaData.codec),
               convertEncodingStats(metaData.getEncoding_stats()),
               fromFormatEncodings(metaData.encodings),
               fromParquetStatistics(

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
index 8bf882f..8befa79 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
@@ -36,9 +36,10 @@ import org.apache.hadoop.util.ReflectionUtils;
 
 import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.compression.CompressionCodecFactory;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
-public class CodecFactory {
+public class CodecFactory implements CompressionCodecFactory {
 
   protected static final Map<String, CompressionCodec> CODEC_BY_NAME = 
Collections
       .synchronizedMap(new HashMap<String, CompressionCodec>());
@@ -118,7 +119,7 @@ public class CodecFactory {
       output.put(decompressed);
     }
 
-    protected void release() {
+    public void release() {
       if (decompressor != null) {
         CodecPool.returnDecompressor(decompressor);
       }
@@ -171,7 +172,7 @@ public class CodecFactory {
     }
 
     @Override
-    protected void release() {
+    public void release() {
       if (compressor != null) {
         CodecPool.returnCompressor(compressor);
       }
@@ -183,6 +184,7 @@ public class CodecFactory {
 
   }
 
+  @Override
   public BytesCompressor getCompressor(CompressionCodecName codecName) {
     BytesCompressor comp = compressors.get(codecName);
     if (comp == null) {
@@ -192,6 +194,7 @@ public class CodecFactory {
     return comp;
   }
 
+  @Override
   public BytesDecompressor getDecompressor(CompressionCodecName codecName) {
     BytesDecompressor decomp = decompressors.get(codecName);
     if (decomp == null) {
@@ -235,6 +238,7 @@ public class CodecFactory {
     }
   }
 
+  @Override
   public void release() {
     for (BytesCompressor compressor : compressors.values()) {
       compressor.release();
@@ -246,15 +250,23 @@ public class CodecFactory {
     decompressors.clear();
   }
 
-  public static abstract class BytesCompressor {
+  /**
+   * @deprecated will be removed in 2.0.0; use 
CompressionCodecFactory.BytesInputCompressor instead.
+   */
+  @Deprecated
+  public static abstract class BytesCompressor implements 
CompressionCodecFactory.BytesInputCompressor {
     public abstract BytesInput compress(BytesInput bytes) throws IOException;
     public abstract CompressionCodecName getCodecName();
-    protected abstract void release();
+    public abstract void release();
   }
 
-  public static abstract class BytesDecompressor {
+  /**
+   * @deprecated will be removed in 2.0.0; use 
CompressionCodecFactory.BytesInputDecompressor instead.
+   */
+  @Deprecated
+  public static abstract class BytesDecompressor implements 
CompressionCodecFactory.BytesInputDecompressor {
     public abstract BytesInput decompress(BytesInput bytes, int 
uncompressedSize) throws IOException;
     public abstract void decompress(ByteBuffer input, int compressedSize, 
ByteBuffer output, int uncompressedSize) throws IOException;
-    protected abstract void release();
+    public abstract void release();
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index f067679..37dfd6d 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -33,6 +33,8 @@ import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.DictionaryPageReadStore;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import 
org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
 import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.slf4j.Logger;
@@ -56,12 +58,12 @@ class ColumnChunkPageReadStore implements PageReadStore, 
DictionaryPageReadStore
    */
   static final class ColumnChunkPageReader implements PageReader {
 
-    private final BytesDecompressor decompressor;
+    private final BytesInputDecompressor decompressor;
     private final long valueCount;
     private final List<DataPage> compressedPages;
     private final DictionaryPage compressedDictionaryPage;
 
-    ColumnChunkPageReader(BytesDecompressor decompressor, List<DataPage> 
compressedPages, DictionaryPage compressedDictionaryPage) {
+    ColumnChunkPageReader(BytesInputDecompressor decompressor, List<DataPage> 
compressedPages, DictionaryPage compressedDictionaryPage) {
       this.decompressor = decompressor;
       this.compressedPages = new LinkedList<DataPage>(compressedPages);
       this.compressedDictionaryPage = compressedDictionaryPage;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
index 344f3ec..58e79ac 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
@@ -179,7 +179,7 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
     }
 
     @Override
-    protected void release() {
+    public void release() {
       DirectCodecPool.INSTANCE.returnDecompressor(decompressor);
     }
   }
@@ -221,7 +221,7 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
     }
 
     @Override
-    protected void release() {
+    public void release() {
       DirectCodecPool.INSTANCE.returnDirectDecompressor(decompressor);
       extraDecompressor.release();
     }
@@ -245,7 +245,7 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
     }
 
     @Override
-    protected void release() {}
+    public void release() {}
 
   }
 
@@ -269,7 +269,7 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
     }
 
     @Override
-    protected void release() {}
+    public void release() {}
   }
 
   public class SnappyCompressor extends BytesCompressor {
@@ -311,7 +311,7 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
     }
 
     @Override
-    protected void release() {
+    public void release() {
       outgoing = DirectCodecFactory.this.release(outgoing);
       incoming = DirectCodecFactory.this.release(incoming);
     }
@@ -333,7 +333,7 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
     }
 
     @Override
-    protected void release() {}
+    public void release() {}
   }
 
   static class DirectCodecPool {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index 88b3d2d..a048878 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -27,6 +27,8 @@ import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.parquet.filter2.compat.FilterCompat;
@@ -47,7 +49,6 @@ import org.slf4j.LoggerFactory;
 import static java.lang.String.format;
 import static org.apache.parquet.Preconditions.checkNotNull;
 import static 
org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
-import static 
org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED_DEFAULT;
 import static 
org.apache.parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING;
 
 class InternalParquetRecordReader<T> {
@@ -160,6 +161,34 @@ class InternalParquetRecordReader<T> {
     return (float) current / total;
   }
 
+  public void initialize(ParquetFileReader reader, ParquetReadOptions options) 
{
+    // copy custom configuration to the Configuration passed to the ReadSupport
+    Configuration conf = new Configuration();
+    if (options instanceof HadoopReadOptions) {
+      conf = ((HadoopReadOptions) options).getConf();
+    }
+    for (String property : options.getPropertyNames()) {
+      conf.set(property, options.getProperty(property));
+    }
+
+    // initialize a ReadContext for this file
+    this.reader = reader;
+    FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData();
+    this.fileSchema = parquetFileMetadata.getSchema();
+    Map<String, String> fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+    ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(conf, toSetMultiMap(fileMetadata), fileSchema));
+    this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+    this.requestedSchema = readContext.getRequestedSchema();
+    this.columnCount = requestedSchema.getPaths().size();
+    this.recordConverter = readSupport.prepareForRead(conf, fileMetadata, 
fileSchema, readContext);
+    this.strictTypeChecking = options.isEnabled(STRICT_TYPE_CHECKING, true);
+    this.total = reader.getRecordCount();
+    this.unmaterializableRecordCounter = new 
UnmaterializableRecordCounter(options, total);
+    this.filterRecords = options.useRecordFilter();
+    reader.setRequestedSchema(requestedSchema);
+    LOG.info("RecordReader initialized will read a total of {} records.", 
total);
+  }
+
   public void initialize(ParquetFileReader reader, Configuration configuration)
       throws IOException {
     // initialize a ReadContext for this file
@@ -177,8 +206,7 @@ class InternalParquetRecordReader<T> {
     this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, 
true);
     this.total = reader.getRecordCount();
     this.unmaterializableRecordCounter = new 
UnmaterializableRecordCounter(configuration, total);
-    this.filterRecords = configuration.getBoolean(
-        RECORD_FILTERING_ENABLED, RECORD_FILTERING_ENABLED_DEFAULT);
+    this.filterRecords = configuration.getBoolean(RECORD_FILTERING_ENABLED, 
true);
     reader.setRequestedSchema(requestedSchema);
     LOG.info("RecordReader initialized will read a total of {} records.", 
total);
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 1815bd6..1ace040 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -26,10 +26,6 @@ import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_
 import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC;
 import static 
org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
 import static 
org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
-import static 
org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
-import static 
org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED_DEFAULT;
-import static 
org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED;
-import static 
org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED_DEFAULT;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -51,17 +47,16 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.bytes.HeapByteBufferAllocator;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.page.DictionaryPageReadStore;
+import 
org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.RowGroupFilter;
 
@@ -80,15 +75,14 @@ import org.apache.parquet.format.PageHeader;
 import org.apache.parquet.format.Util;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import 
org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
-import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor;
 import 
org.apache.parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.HadoopReadOptions;
 import org.apache.parquet.hadoop.util.HiddenFileFilter;
-import org.apache.parquet.hadoop.util.HadoopStreams;
 import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
 import org.apache.parquet.io.ParquetDecodingException;
@@ -119,6 +113,7 @@ public class ParquetFileReader implements Closeable {
    * @param partFiles the part files to read
    * @return the footers for those files using the summary file if possible.
    * @throws IOException
+   * @deprecated metadata files are not recommended and will be removed in 
2.0.0
    */
   @Deprecated
   public static List<Footer> 
readAllFootersInParallelUsingSummaryFiles(Configuration configuration, 
List<FileStatus> partFiles) throws IOException {
@@ -137,7 +132,9 @@ public class ParquetFileReader implements Closeable {
    * @param skipRowGroups to skipRowGroups in the footers
    * @return the footers for those files using the summary file if possible.
    * @throws IOException
+   * @deprecated metadata files are not recommended and will be removed in 
2.0.0
    */
+  @Deprecated
   public static List<Footer> readAllFootersInParallelUsingSummaryFiles(
       final Configuration configuration,
       final Collection<FileStatus> partFiles,
@@ -233,6 +230,9 @@ public class ParquetFileReader implements Closeable {
     }
   }
 
+  /**
+   * @deprecated metadata files are not recommended and will be removed in 
2.0.0
+   */
   @Deprecated
   public static List<Footer> readAllFootersInParallel(final Configuration 
configuration, List<FileStatus> partFiles) throws IOException {
     return readAllFootersInParallel(configuration, partFiles, false);
@@ -246,7 +246,10 @@ public class ParquetFileReader implements Closeable {
    * @param skipRowGroups to skip the rowGroup info
    * @return the footers
    * @throws IOException
+   * @deprecated will be removed in 2.0.0;
+   *             use {@link ParquetFileReader#open(InputFile, 
ParquetReadOptions)}
    */
+  @Deprecated
   public static List<Footer> readAllFootersInParallel(final Configuration 
configuration, List<FileStatus> partFiles, final boolean skipRowGroups) throws 
IOException {
     List<Callable<Footer>> footers = new ArrayList<Callable<Footer>>();
     for (final FileStatus currentFile : partFiles) {
@@ -271,7 +274,10 @@ public class ParquetFileReader implements Closeable {
   /**
    * Read the footers of all the files under that path (recursively)
    * not using summary files.
+   * @deprecated will be removed in 2.0.0;
+   *             use {@link ParquetFileReader#open(InputFile, 
ParquetReadOptions)}
    */
+  @Deprecated
   public static List<Footer> readAllFootersInParallel(Configuration 
configuration, FileStatus fileStatus, boolean skipRowGroups) throws IOException 
{
     List<FileStatus> statuses = listFiles(configuration, fileStatus);
     return readAllFootersInParallel(configuration, statuses, skipRowGroups);
@@ -285,12 +291,18 @@ public class ParquetFileReader implements Closeable {
    * @param fileStatus the root dir
    * @return all the footers
    * @throws IOException
+   * @deprecated will be removed in 2.0.0;
+   *             use {@link ParquetFileReader#open(InputFile, 
ParquetReadOptions)}
    */
+  @Deprecated
   public static List<Footer> readAllFootersInParallel(Configuration 
configuration, FileStatus fileStatus) throws IOException {
     return readAllFootersInParallel(configuration, fileStatus, false);
   }
 
-
+  /**
+   * @deprecated will be removed in 2.0.0;
+   *             use {@link ParquetFileReader#open(InputFile, 
ParquetReadOptions)}
+   */
   @Deprecated
   public static List<Footer> readFooters(Configuration configuration, Path 
path) throws IOException {
     return readFooters(configuration, status(configuration, path));
@@ -306,6 +318,8 @@ public class ParquetFileReader implements Closeable {
    * @param pathStatus
    * @return
    * @throws IOException
+   * @deprecated will be removed in 2.0.0;
+   *             use {@link ParquetFileReader#open(InputFile, 
ParquetReadOptions)}
    */
   @Deprecated
   public static List<Footer> readFooters(Configuration configuration, 
FileStatus pathStatus) throws IOException {
@@ -319,7 +333,10 @@ public class ParquetFileReader implements Closeable {
    * @param pathStatus the root dir
    * @return all the footers
    * @throws IOException
+   * @deprecated will be removed in 2.0.0;
+   *             use {@link ParquetFileReader#open(InputFile, 
ParquetReadOptions)}
    */
+  @Deprecated
   public static List<Footer> readFooters(Configuration configuration, 
FileStatus pathStatus, boolean skipRowGroups) throws IOException {
     List<FileStatus> files = listFiles(configuration, pathStatus);
     return readAllFootersInParallelUsingSummaryFiles(configuration, files, 
skipRowGroups);
@@ -345,7 +362,9 @@ public class ParquetFileReader implements Closeable {
    * @param summaryStatus
    * @return the metadata translated for each file
    * @throws IOException
+   * @deprecated metadata files are not recommended and will be removed in 
2.0.0
    */
+  @Deprecated
   public static List<Footer> readSummaryFile(Configuration configuration, 
FileStatus summaryStatus) throws IOException {
     final Path parent = summaryStatus.getPath().getParent();
     ParquetMetadata mergedFooters = readFooter(configuration, summaryStatus, 
filter(false));
@@ -394,6 +413,8 @@ public class ParquetFileReader implements Closeable {
    * @param file the parquet File
    * @return the metadata blocks in the footer
    * @throws IOException if an error occurs while reading the file
+   * @deprecated will be removed in 2.0.0;
+   *             use {@link ParquetFileReader#open(InputFile, 
ParquetReadOptions)}
    */
   @Deprecated
   public static final ParquetMetadata readFooter(Configuration configuration, 
Path file) throws IOException {
@@ -408,13 +429,16 @@ public class ParquetFileReader implements Closeable {
    * @param filter the filter to apply to row groups
    * @return the metadata with row groups filtered.
    * @throws IOException  if an error occurs while reading the file
+   * @deprecated will be removed in 2.0.0;
+   *             use {@link ParquetFileReader#open(InputFile, 
ParquetReadOptions)}
    */
   public static ParquetMetadata readFooter(Configuration configuration, Path 
file, MetadataFilter filter) throws IOException {
     return readFooter(HadoopInputFile.fromPath(file, configuration), filter);
   }
 
   /**
-   * @deprecated use {@link ParquetFileReader#readFooter(Configuration, 
FileStatus, MetadataFilter)}
+   * @deprecated will be removed in 2.0.0;
+   *             use {@link ParquetFileReader#open(InputFile, 
ParquetReadOptions)}
    */
   @Deprecated
   public static final ParquetMetadata readFooter(Configuration configuration, 
FileStatus file) throws IOException {
@@ -428,7 +452,10 @@ public class ParquetFileReader implements Closeable {
    * @param filter the filter to apply to row groups
    * @return the metadata blocks in the footer
    * @throws IOException if an error occurs while reading the file
+   * @deprecated will be removed in 2.0.0;
+   *             use {@link ParquetFileReader#open(InputFile, 
ParquetReadOptions)}
    */
+  @Deprecated
   public static final ParquetMetadata readFooter(Configuration configuration, 
FileStatus file, MetadataFilter filter) throws IOException {
     return readFooter(HadoopInputFile.fromStatus(file, configuration), filter);
   }
@@ -439,35 +466,32 @@ public class ParquetFileReader implements Closeable {
    * @param filter the filter to apply to row groups
    * @return the metadata blocks in the footer
    * @throws IOException if an error occurs while reading the file
+   * @deprecated will be removed in 2.0.0;
+   *             use {@link ParquetFileReader#open(InputFile, 
ParquetReadOptions)}
    */
-  public static final ParquetMetadata readFooter(
-      InputFile file, MetadataFilter filter) throws IOException {
-    ParquetMetadataConverter converter;
-    // TODO: remove this temporary work-around.
-    // this is necessary to pass the Configuration to ParquetMetadataConverter
-    // and should be removed when there is a non-Hadoop configuration.
+  @Deprecated
+  public static final ParquetMetadata readFooter(InputFile file, 
MetadataFilter filter) throws IOException {
+    ParquetReadOptions options;
     if (file instanceof HadoopInputFile) {
-      converter = new ParquetMetadataConverter(
-          ((HadoopInputFile) file).getConfiguration());
+      options = HadoopReadOptions.builder(((HadoopInputFile) 
file).getConfiguration())
+          .withMetadataFilter(filter).build();
     } else {
-      converter = new ParquetMetadataConverter();
+      options = 
ParquetReadOptions.builder().withMetadataFilter(filter).build();
     }
-    try (SeekableInputStream in = file.newStream()) {
 
-      return readFooter(converter, file.getLength(), file.toString(), in, 
filter);
+    try (SeekableInputStream in = file.newStream()) {
+      return readFooter(file, options, in);
     }
   }
 
-  /**
-   * Reads the meta data block in the footer of the file using provided input 
stream
-   * @param fileLen length of the file
-   * @param filePath file location
-   * @param f input stream for the file
-   * @param filter the filter to apply to row groups
-   * @return the metadata blocks in the footer
-   * @throws IOException if an error occurs while reading the file
-   */
-  private static final ParquetMetadata readFooter(ParquetMetadataConverter 
converter, long fileLen, String filePath, SeekableInputStream f, MetadataFilter 
filter) throws IOException {
+  private static final ParquetMetadata readFooter(InputFile file, 
ParquetReadOptions options, SeekableInputStream f) throws IOException {
+    ParquetMetadataConverter converter = new ParquetMetadataConverter(options);
+    return readFooter(file, options, f, converter);
+  }
+
+  private static final ParquetMetadata readFooter(InputFile file, 
ParquetReadOptions options, SeekableInputStream f, ParquetMetadataConverter 
converter) throws IOException {
+    long fileLen = file.getLength();
+    String filePath = file.toString();
     LOG.debug("File length {}", fileLen);
     int FOOTER_LENGTH_SIZE = 4;
     if (fileLen < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC 
+ data + footer + footerIndex + MAGIC
@@ -489,43 +513,75 @@ public class ParquetFileReader implements Closeable {
       throw new RuntimeException("corrupted file: the footer index is not 
within the file: " + footerIndex);
     }
     f.seek(footerIndex);
-    return converter.readParquetMetadata(f, filter);
+    return converter.readParquetMetadata(f, options.getMetadataFilter());
   }
 
+  /**
+   * @deprecated will be removed in 2.0.0; use {@link #open(InputFile)}
+   */
+  @Deprecated
   public static ParquetFileReader open(Configuration conf, Path file) throws 
IOException {
-    return new ParquetFileReader(conf, file);
+    return new ParquetFileReader(HadoopInputFile.fromPath(file, conf),
+        HadoopReadOptions.builder(conf).build());
   }
 
+  /**
+   * @deprecated will be removed in 2.0.0; use {@link 
#open(InputFile,ParquetReadOptions)}
+   */
+  @Deprecated
   public static ParquetFileReader open(Configuration conf, Path file, 
MetadataFilter filter) throws IOException {
-    return new ParquetFileReader(conf, file, filter);
+    return open(HadoopInputFile.fromPath(file, conf),
+        HadoopReadOptions.builder(conf).withMetadataFilter(filter).build());
   }
 
+  /**
+   * @deprecated will be removed in 2.0.0
+   */
+  @Deprecated
   public static ParquetFileReader open(Configuration conf, Path file, 
ParquetMetadata footer) throws IOException {
     return new ParquetFileReader(conf, file, footer);
   }
 
-  private final CodecFactory codecFactory;
+  /**
+   * Open a {@link InputFile file}.
+   *
+   * @param file an input file
+   * @return an open ParquetFileReader
+   */
+  public static ParquetFileReader open(InputFile file) throws IOException {
+    return new ParquetFileReader(file, ParquetReadOptions.builder().build());
+  }
+
+  /**
+   * Open a {@link InputFile file} with {@link ParquetReadOptions options}.
+   *
+   * @param file an input file
+   * @return an open ParquetFileReader
+   */
+  public static ParquetFileReader open(InputFile file, ParquetReadOptions 
options) throws IOException {
+    return new ParquetFileReader(file, options);
+  }
+
+  private final InputFile file;
   private final SeekableInputStream f;
-  private final FileStatus fileStatus;
-  private final Map<ColumnPath, ColumnDescriptor> paths = new 
HashMap<ColumnPath, ColumnDescriptor>();
+  private final ParquetReadOptions options;
+  private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<>();
   private final FileMetaData fileMetaData; // may be null
-  private final ByteBufferAllocator allocator;
-  private final Configuration conf;
+  private final List<BlockMetaData> blocks;
 
   // not final. in some cases, this may be lazily loaded for backward-compat.
   private ParquetMetadata footer;
-  // blocks can be filtered after they are read (or set in the constructor)
-  private List<BlockMetaData> blocks;
 
   private int currentBlock = 0;
   private ColumnChunkPageReadStore currentRowGroup = null;
   private DictionaryPageReader nextDictionaryReader = null;
 
   /**
-   * @deprecated use @link{ParquetFileReader(Configuration configuration, 
FileMetaData fileMetaData,
-   * Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> 
columns)} instead
+   * @deprecated use {@link 
ParquetFileReader(Configuration,FileMetaData,Path,List,List)} instead.
    */
-  public ParquetFileReader(Configuration configuration, Path filePath, 
List<BlockMetaData> blocks, List<ColumnDescriptor> columns) throws IOException {
+  @Deprecated
+  public ParquetFileReader(Configuration configuration, Path filePath, 
List<BlockMetaData> blocks,
+                           List<ColumnDescriptor> columns) throws IOException {
     this(configuration, null, filePath, blocks, columns);
   }
 
@@ -541,28 +597,14 @@ public class ParquetFileReader implements Closeable {
       Configuration configuration, FileMetaData fileMetaData,
       Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> 
columns) throws IOException {
     this.converter = new ParquetMetadataConverter(configuration);
-    this.conf = configuration;
+    this.file = HadoopInputFile.fromPath(filePath, configuration);
     this.fileMetaData = fileMetaData;
-    FileSystem fs = filePath.getFileSystem(configuration);
-    this.f = HadoopStreams.wrap(fs.open(filePath));
-    this.fileStatus = fs.getFileStatus(filePath);
-    this.blocks = blocks;
+    this.f = file.newStream();
+    this.options = HadoopReadOptions.builder(configuration).build();
+    this.blocks = filterRowGroups(blocks);
     for (ColumnDescriptor col : columns) {
       paths.put(ColumnPath.get(col.getPath()), col);
     }
-    // the page size parameter isn't meaningful when only using
-    // the codec factory to get decompressors
-    this.codecFactory = new CodecFactory(configuration, 0);
-    this.allocator = new HeapByteBufferAllocator();
-  }
-
-  /**
-   * @param configuration the Hadoop Configuration
-   * @param file Path to a parquet file
-   * @throws IOException if the file can not be opened
-   */
-  private ParquetFileReader(Configuration configuration, Path file) throws 
IOException {
-    this(configuration, file, NO_FILTER);
   }
 
   /**
@@ -570,23 +612,13 @@ public class ParquetFileReader implements Closeable {
    * @param file Path to a parquet file
    * @param filter a {@link MetadataFilter} for selecting row groups
    * @throws IOException if the file can not be opened
+   * @deprecated will be removed in 2.0.0;
+   *             use {@link ParquetFileReader(InputFile,MetadataFilter)} 
instead
    */
+  @Deprecated
   public ParquetFileReader(Configuration conf, Path file, MetadataFilter 
filter) throws IOException {
-    this.converter = new ParquetMetadataConverter(conf);
-    this.conf = conf;
-    FileSystem fs = file.getFileSystem(conf);
-    this.fileStatus = fs.getFileStatus(file);
-    this.f = HadoopStreams.wrap(fs.open(file));
-    this.footer = readFooter(converter, fileStatus.getLen(), 
fileStatus.getPath().toString(), f, filter);
-    this.fileMetaData = footer.getFileMetaData();
-    this.blocks = footer.getBlocks();
-    for (ColumnDescriptor col : 
footer.getFileMetaData().getSchema().getColumns()) {
-      paths.put(ColumnPath.get(col.getPath()), col);
-    }
-    // the page size parameter isn't meaningful when only using
-    // the codec factory to get decompressors
-    this.codecFactory = new CodecFactory(conf, 0);
-    this.allocator = new HeapByteBufferAllocator();
+    this(HadoopInputFile.fromPath(file, conf),
+        HadoopReadOptions.builder(conf).withMetadataFilter(filter).build());
   }
 
   /**
@@ -595,29 +627,38 @@ public class ParquetFileReader implements Closeable {
    * @param footer a {@link ParquetMetadata} footer already read from the file
    * @throws IOException if the file can not be opened
    */
+  @Deprecated
   public ParquetFileReader(Configuration conf, Path file, ParquetMetadata 
footer) throws IOException {
     this.converter = new ParquetMetadataConverter(conf);
-    this.conf = conf;
-    FileSystem fs = file.getFileSystem(conf);
-    this.fileStatus = fs.getFileStatus(file);
-    this.f = HadoopStreams.wrap(fs.open(file));
+    this.file = HadoopInputFile.fromPath(file, conf);
+    this.f = this.file.newStream();
+    this.options = HadoopReadOptions.builder(conf).build();
     this.footer = footer;
     this.fileMetaData = footer.getFileMetaData();
-    this.blocks = footer.getBlocks();
+    this.blocks = filterRowGroups(footer.getBlocks());
+    for (ColumnDescriptor col : 
footer.getFileMetaData().getSchema().getColumns()) {
+      paths.put(ColumnPath.get(col.getPath()), col);
+    }
+  }
+
+  public ParquetFileReader(InputFile file, ParquetReadOptions options) throws 
IOException {
+    this.converter = new ParquetMetadataConverter(options);
+    this.file = file;
+    this.f = file.newStream();
+    this.options = options;
+    this.footer = readFooter(file, options, f, converter);
+    this.fileMetaData = footer.getFileMetaData();
+    this.blocks = filterRowGroups(footer.getBlocks());
     for (ColumnDescriptor col : 
footer.getFileMetaData().getSchema().getColumns()) {
       paths.put(ColumnPath.get(col.getPath()), col);
     }
-    // the page size parameter isn't meaningful when only using
-    // the codec factory to get decompressors
-    this.codecFactory = new CodecFactory(conf, 0);
-    this.allocator = new HeapByteBufferAllocator();
   }
 
   public ParquetMetadata getFooter() {
     if (footer == null) {
       try {
         // don't read the row groups because this.blocks is always set
-        this.footer = readFooter(converter, fileStatus.getLen(), 
fileStatus.getPath().toString(), f, SKIP_ROW_GROUPS);
+        this.footer = readFooter(file, options, f, converter);
       } catch (IOException e) {
         throw new ParquetDecodingException("Unable to read file footer", e);
       }
@@ -640,25 +681,36 @@ public class ParquetFileReader implements Closeable {
     return total;
   }
 
+  /**
+   * @deprecated will be removed in 2.0.0; use {@link #getFile()} instead
+   */
+  @Deprecated
   public Path getPath() {
-    return fileStatus.getPath();
+    return new Path(file.toString());
+  }
+
+  public String getFile() {
+    return file.toString();
   }
 
-  void filterRowGroups(FilterCompat.Filter filter) throws IOException {
+  private List<BlockMetaData> filterRowGroups(List<BlockMetaData> blocks) 
throws IOException {
     // set up data filters based on configured levels
-    List<RowGroupFilter.FilterLevel> levels = new 
ArrayList<RowGroupFilter.FilterLevel>();
+    List<RowGroupFilter.FilterLevel> levels = new ArrayList<>();
 
-    if (conf.getBoolean(
-        STATS_FILTERING_ENABLED, STATS_FILTERING_ENABLED_DEFAULT)) {
+    if (options.useStatsFilter()) {
       levels.add(STATISTICS);
     }
 
-    if (conf.getBoolean(
-        DICTIONARY_FILTERING_ENABLED, DICTIONARY_FILTERING_ENABLED_DEFAULT)) {
+    if (options.useDictionaryFilter()) {
       levels.add(DICTIONARY);
     }
 
-    this.blocks = RowGroupFilter.filterRowGroups(levels, filter, blocks, this);
+    FilterCompat.Filter recordFilter = options.getRecordFilter();
+    if (recordFilter != null) {
+      return RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, 
this);
+    }
+
+    return blocks;
   }
 
   public List<BlockMetaData> getRowGroups() {
@@ -785,7 +837,7 @@ public class ParquetFileReader implements Closeable {
     }
 
     DictionaryPage compressedPage = readCompressedDictionary(pageHeader, f);
-    BytesDecompressor decompressor = 
codecFactory.getDecompressor(meta.getCodec());
+    BytesInputDecompressor decompressor = 
options.getCodecFactory().getDecompressor(meta.getCodec());
 
     return new DictionaryPage(
         decompressor.decompress(compressedPage.getBytes(), 
compressedPage.getUncompressedSize()),
@@ -817,9 +869,7 @@ public class ParquetFileReader implements Closeable {
         f.close();
       }
     } finally {
-      if (codecFactory != null) {
-        codecFactory.release();
-      }
+      options.getCodecFactory().release();
     }
   }
 
@@ -929,7 +979,7 @@ public class ParquetFileReader implements Closeable {
             " but got " + valuesCountReadSoFar + " values instead over " + 
pagesInChunk.size()
             + " pages ending at file offset " + (descriptor.fileOffset + 
pos()));
       }
-      BytesDecompressor decompressor = 
codecFactory.getDecompressor(descriptor.metadata.getCodec());
+      BytesInputDecompressor decompressor = 
options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
       return new ColumnChunkPageReader(decompressor, pagesInChunk, 
dictionaryPage);
     }
 
@@ -1077,7 +1127,7 @@ public class ParquetFileReader implements Closeable {
       f.seek(offset);
 
       // Allocate the bytebuffer based on whether the FS can support it.
-      ByteBuffer chunksByteBuffer = allocator.allocate(length);
+      ByteBuffer chunksByteBuffer = options.getAllocator().allocate(length);
       f.readFully(chunksByteBuffer);
 
       // report in a counter the data we just scanned

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 57500bf..da8635d 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -23,7 +23,6 @@ import static 
org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
 import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -36,7 +35,6 @@ import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -59,9 +57,13 @@ import 
org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.metadata.GlobalMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
 import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.OutputFile;
 import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.PositionOutputStream;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.TypeUtil;
@@ -85,22 +87,6 @@ public class ParquetFileWriter {
   public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata";
   public static final int CURRENT_VERSION = 1;
 
-  // need to supply a buffer size when setting block size. this is the default
-  // for hadoop 1 to present. copying it avoids loading DFSConfigKeys.
-  private static final int DFS_BUFFER_SIZE_DEFAULT = 4096;
-
-  // visible for testing
-  static final Set<String> BLOCK_FS_SCHEMES = new HashSet<String>();
-  static {
-    BLOCK_FS_SCHEMES.add("hdfs");
-    BLOCK_FS_SCHEMES.add("webhdfs");
-    BLOCK_FS_SCHEMES.add("viewfs");
-  }
-
-  private static boolean supportsBlockSize(FileSystem fs) {
-    return BLOCK_FS_SCHEMES.contains(fs.getUri().getScheme());
-  }
-
   // File creation modes
   public static enum Mode {
     CREATE,
@@ -108,7 +94,7 @@ public class ParquetFileWriter {
   }
 
   private final MessageType schema;
-  private final FSDataOutputStream out;
+  private final PositionOutputStream out;
   private final AlignmentStrategy alignment;
 
   // file data
@@ -193,11 +179,14 @@ public class ParquetFileWriter {
    * @param schema the schema of the data
    * @param file the file to write to
    * @throws IOException if the file can not be created
+   * @deprecated will be removed in 2.0.0;
+   *             use {@link 
ParquetFileWriter(OutputFile,MessageType,Mode,long,long)} instead
    */
+  @Deprecated
   public ParquetFileWriter(Configuration configuration, MessageType schema,
       Path file) throws IOException {
-    this(configuration, schema, file, Mode.CREATE, DEFAULT_BLOCK_SIZE,
-        MAX_PADDING_SIZE_DEFAULT);
+    this(HadoopOutputFile.fromPath(file, configuration),
+        schema, Mode.CREATE, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT);
   }
 
   /**
@@ -206,11 +195,14 @@ public class ParquetFileWriter {
    * @param file the file to write to
    * @param mode file creation mode
    * @throws IOException if the file can not be created
+   * @deprecated will be removed in 2.0.0;
+   *             use {@link 
ParquetFileWriter(OutputFile,MessageType,Mode,long,long)} instead
    */
+  @Deprecated
   public ParquetFileWriter(Configuration configuration, MessageType schema,
                            Path file, Mode mode) throws IOException {
-    this(configuration, schema, file, mode, DEFAULT_BLOCK_SIZE,
-        MAX_PADDING_SIZE_DEFAULT);
+    this(HadoopOutputFile.fromPath(file, configuration),
+        schema, mode, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT);
   }
 
   /**
@@ -219,36 +211,54 @@ public class ParquetFileWriter {
    * @param file the file to write to
    * @param mode file creation mode
    * @param rowGroupSize the row group size
+   * @param maxPaddingSize the maximum padding
    * @throws IOException if the file can not be created
+   * @deprecated will be removed in 2.0.0;
+   *             use {@link 
ParquetFileWriter(OutputFile,MessageType,Mode,long,long)} instead
    */
+  @Deprecated
   public ParquetFileWriter(Configuration configuration, MessageType schema,
                            Path file, Mode mode, long rowGroupSize,
                            int maxPaddingSize)
       throws IOException {
-    TypeUtil.checkValidWriteSchema(schema);
-    this.schema = schema;
-    FileSystem fs = file.getFileSystem(configuration);
-    boolean overwriteFlag = (mode == Mode.OVERWRITE);
+    this(HadoopOutputFile.fromPath(file, configuration),
+        schema, mode, rowGroupSize, maxPaddingSize);
+  }
 
-    if (supportsBlockSize(fs)) {
-      // use the default block size, unless row group size is larger
-      long dfsBlockSize = Math.max(fs.getDefaultBlockSize(file), rowGroupSize);
+  /**
+   * @param file OutputFile to create or overwrite
+   * @param schema the schema of the data
+   * @param mode file creation mode
+   * @param rowGroupSize the row group size
+   * @param maxPaddingSize the maximum padding
+   * @throws IOException if the file can not be created
+   */
+  public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
+                           long rowGroupSize, int maxPaddingSize)
+      throws IOException {
+    TypeUtil.checkValidWriteSchema(schema);
 
-      this.alignment = PaddingAlignment.get(
-          dfsBlockSize, rowGroupSize, maxPaddingSize);
-      this.out = fs.create(file, overwriteFlag, DFS_BUFFER_SIZE_DEFAULT,
-          fs.getDefaultReplication(file), dfsBlockSize);
+    this.schema = schema;
 
+    long blockSize = rowGroupSize;
+    if (file.supportsBlockSize()) {
+      blockSize = Math.max(file.defaultBlockSize(), rowGroupSize);
+      this.alignment = PaddingAlignment.get(blockSize, rowGroupSize, 
maxPaddingSize);
     } else {
       this.alignment = NoAlignment.get(rowGroupSize);
-      this.out = fs.create(file, overwriteFlag);
+    }
+
+    if (mode == Mode.OVERWRITE) {
+      this.out = file.createOrOverwrite(blockSize);
+    } else {
+      this.out = file.create(blockSize);
     }
 
     this.encodingStatsBuilder = new EncodingStats.Builder();
   }
 
   /**
-   * FOR TESTING ONLY.
+   * FOR TESTING ONLY. This supports testing block padding behavior on the 
local FS.
    *
    * @param configuration Hadoop configuration
    * @param schema the schema of the data
@@ -263,11 +273,10 @@ public class ParquetFileWriter {
     this.schema = schema;
     this.alignment = PaddingAlignment.get(
         rowAndBlockSize, rowAndBlockSize, maxPaddingSize);
-    this.out = fs.create(file, true, DFS_BUFFER_SIZE_DEFAULT,
-        fs.getDefaultReplication(file), rowAndBlockSize);
+    this.out = HadoopStreams.wrap(
+        fs.create(file, true, 8192, fs.getDefaultReplication(file), 
rowAndBlockSize));
     this.encodingStatsBuilder = new EncodingStats.Builder();
   }
-
   /**
    * start the file
    * @throws IOException
@@ -490,10 +499,23 @@ public class ParquetFileWriter {
     currentBlock = null;
   }
 
+  /**
+   * @deprecated will be removed in 2.0.0; use {@link #appendFile(InputFile)} 
instead
+   */
+  @Deprecated
   public void appendFile(Configuration conf, Path file) throws IOException {
     ParquetFileReader.open(conf, file).appendTo(this);
   }
 
+  public void appendFile(InputFile file) throws IOException {
+    ParquetFileReader.open(file).appendTo(this);
+  }
+
+  /**
+   * @deprecated will be removed in 2.0.0;
+   *             use {@link 
#appendRowGroups(SeekableInputStream,List,boolean)} instead
+   */
+  @Deprecated
   public void appendRowGroups(FSDataInputStream file,
                               List<BlockMetaData> rowGroups,
                               boolean dropColumns) throws IOException {
@@ -508,13 +530,18 @@ public class ParquetFileWriter {
     }
   }
 
+  /**
+   * @deprecated will be removed in 2.0.0;
+   *             use {@link 
#appendRowGroup(SeekableInputStream,BlockMetaData,boolean)} instead
+   */
+  @Deprecated
   public void appendRowGroup(FSDataInputStream from, BlockMetaData rowGroup,
                              boolean dropColumns) throws IOException {
-    appendRowGroup(from, rowGroup, dropColumns);
+    appendRowGroup(HadoopStreams.wrap(from), rowGroup, dropColumns);
   }
 
   public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup,
-    boolean dropColumns) throws IOException {
+                             boolean dropColumns) throws IOException {
     startBlock(rowGroup.getRowCount());
 
     Map<String, ColumnChunkMetaData> columnsToCopy =
@@ -603,13 +630,13 @@ public class ParquetFileWriter {
   /**
    * Copy from a FS input stream to an output stream. Thread-safe
    *
-   * @param from a {@link FSDataInputStream}
-   * @param to any {@link OutputStream}
+   * @param from a {@link SeekableInputStream}
+   * @param to any {@link PositionOutputStream}
    * @param start where in the from stream to start copying
    * @param length the number of bytes to copy
    * @throws IOException
    */
-  private static void copy(SeekableInputStream from, FSDataOutputStream to,
+  private static void copy(SeekableInputStream from, PositionOutputStream to,
                            long start, long length) throws IOException{
     LOG.debug("Copying {} bytes at {} to {}" ,length , start , to.getPos());
     from.seek(start);
@@ -642,7 +669,7 @@ public class ParquetFileWriter {
     out.close();
   }
 
-  private static void serializeFooter(ParquetMetadata footer, 
FSDataOutputStream out) throws IOException {
+  private static void serializeFooter(ParquetMetadata footer, 
PositionOutputStream out) throws IOException {
     long footerIndex = out.getPos();
     org.apache.parquet.format.FileMetaData parquetMetadata = 
metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
     writeFileMetaData(parquetMetadata, out);
@@ -654,7 +681,9 @@ public class ParquetFileWriter {
   /**
    * Given a list of metadata files, merge them into a single ParquetMetadata
    * Requires that the schemas be compatible, and the extraMetadata be exactly 
equal.
+   * @deprecated metadata files are not recommended and will be removed in 
2.0.0
    */
+  @Deprecated
   public static ParquetMetadata mergeMetadataFiles(List<Path> files,  
Configuration conf) throws IOException {
     Preconditions.checkArgument(!files.isEmpty(), "Cannot merge an empty list 
of metadata");
 
@@ -677,7 +706,9 @@ public class ParquetFileWriter {
    * Requires that the schemas be compatible, and the extraMetaData be exactly 
equal.
    * This is useful when merging 2 directories of parquet files into a single 
directory, as long
    * as both directories were written with compatible schemas and equal 
extraMetaData.
+   * @deprecated metadata files are not recommended and will be removed in 
2.0.0
    */
+  @Deprecated
   public static void writeMergedMetadataFile(List<Path> files, Path 
outputPath, Configuration conf) throws IOException {
     ParquetMetadata merged = mergeMetadataFiles(files, conf);
     writeMetadataFile(outputPath, merged, outputPath.getFileSystem(conf));
@@ -688,8 +719,8 @@ public class ParquetFileWriter {
    * @param configuration the configuration to use to get the FileSystem
    * @param outputPath the directory to write the _metadata file to
    * @param footers the list of footers to merge
-   * @deprecated use the variant of writeMetadataFile that takes a {@link 
JobSummaryLevel} as an argument.
    * @throws IOException
+   * @deprecated metadata files are not recommended and will be removed in 
2.0.0
    */
   @Deprecated
   public static void writeMetadataFile(Configuration configuration, Path 
outputPath, List<Footer> footers) throws IOException {
@@ -698,7 +729,9 @@ public class ParquetFileWriter {
 
   /**
    * writes _common_metadata file, and optionally a _metadata file depending 
on the {@link JobSummaryLevel} provided
+   * @deprecated metadata files are not recommended and will be removed in 
2.0.0
    */
+  @Deprecated
   public static void writeMetadataFile(Configuration configuration, Path 
outputPath, List<Footer> footers, JobSummaryLevel level) throws IOException {
     Preconditions.checkArgument(level == JobSummaryLevel.ALL || level == 
JobSummaryLevel.COMMON_ONLY,
         "Unsupported level: " + level);
@@ -715,15 +748,23 @@ public class ParquetFileWriter {
     writeMetadataFile(outputPath, metadataFooter, fs, 
PARQUET_COMMON_METADATA_FILE);
   }
 
+  /**
+   * @deprecated metadata files are not recommended and will be removed in 
2.0.0
+   */
+  @Deprecated
   private static void writeMetadataFile(Path outputPathRoot, ParquetMetadata 
metadataFooter, FileSystem fs, String parquetMetadataFile)
       throws IOException {
     Path metaDataPath = new Path(outputPathRoot, parquetMetadataFile);
     writeMetadataFile(metaDataPath, metadataFooter, fs);
   }
 
+  /**
+   * @deprecated metadata files are not recommended and will be removed in 
2.0.0
+   */
+  @Deprecated
   private static void writeMetadataFile(Path outputPath, ParquetMetadata 
metadataFooter, FileSystem fs)
       throws IOException {
-    FSDataOutputStream metadata = fs.create(outputPath);
+    PositionOutputStream metadata = HadoopStreams.wrap(fs.create(outputPath));
     metadata.write(MAGIC);
     serializeFooter(metadataFooter, metadata);
     metadata.close();
@@ -850,9 +891,9 @@ public class ParquetFileWriter {
   }
 
   private interface AlignmentStrategy {
-    void alignForRowGroup(FSDataOutputStream out) throws IOException;
+    void alignForRowGroup(PositionOutputStream out) throws IOException;
 
-    long nextRowGroupSize(FSDataOutputStream out) throws IOException;
+    long nextRowGroupSize(PositionOutputStream out) throws IOException;
   }
 
   private static class NoAlignment implements AlignmentStrategy {
@@ -867,11 +908,11 @@ public class ParquetFileWriter {
     }
 
     @Override
-    public void alignForRowGroup(FSDataOutputStream out) {
+    public void alignForRowGroup(PositionOutputStream out) {
     }
 
     @Override
-    public long nextRowGroupSize(FSDataOutputStream out) {
+    public long nextRowGroupSize(PositionOutputStream out) {
       return rowGroupSize;
     }
   }
@@ -900,7 +941,7 @@ public class ParquetFileWriter {
     }
 
     @Override
-    public void alignForRowGroup(FSDataOutputStream out) throws IOException {
+    public void alignForRowGroup(PositionOutputStream out) throws IOException {
       long remaining = dfsBlockSize - (out.getPos() % dfsBlockSize);
 
       if (isPaddingNeeded(remaining)) {
@@ -912,7 +953,7 @@ public class ParquetFileWriter {
     }
 
     @Override
-    public long nextRowGroupSize(FSDataOutputStream out) throws IOException {
+    public long nextRowGroupSize(PositionOutputStream out) throws IOException {
       if (maxPaddingSize <= 0) {
         return rowGroupSize;
       }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
index 7c5b5be..979388d 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
@@ -120,19 +120,16 @@ public class ParquetInputFormat<T> extends 
FileInputFormat<Void, T> {
    * key to configure whether record-level filtering is enabled
    */
   public static final String RECORD_FILTERING_ENABLED = 
"parquet.filter.record-level.enabled";
-  static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true;
 
   /**
    * key to configure whether row group stats filtering is enabled
    */
   public static final String STATS_FILTERING_ENABLED = 
"parquet.filter.stats.enabled";
-  static final boolean STATS_FILTERING_ENABLED_DEFAULT = true;
 
   /**
    * key to configure whether row group dictionary filtering is enabled
    */
   public static final String DICTIONARY_FILTERING_ENABLED = 
"parquet.filter.dictionary.enabled";
-  static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = false;
 
   /**
    * key to turn on or off task side metadata loading (default true)

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 78af765..340ec11 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -42,6 +42,7 @@ import 
org.apache.parquet.hadoop.api.WriteSupport.WriteContext;
 import org.apache.parquet.hadoop.codec.CodecConfig;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.util.ConfigurationUtil;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -383,8 +384,8 @@ public class ParquetOutputFormat<T> extends 
FileOutputFormat<Void, T> {
     }
 
     WriteContext init = writeSupport.init(conf);
-    ParquetFileWriter w = new ParquetFileWriter(
-        conf, init.getSchema(), file, Mode.CREATE, blockSize, maxPaddingSize);
+    ParquetFileWriter w = new 
ParquetFileWriter(HadoopOutputFile.fromPath(file, conf),
+        init.getSchema(), Mode.CREATE, blockSize, maxPaddingSize);
     w.start();
 
     float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
index ff9c811..1ba5380 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
@@ -22,7 +22,8 @@ import static org.apache.parquet.Preconditions.checkNotNull;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -31,12 +32,17 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.Preconditions;
+import org.apache.parquet.compression.CompressionCodecFactory;
 import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.FilterCompat.Filter;
 import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.HadoopReadOptions;
 import org.apache.parquet.hadoop.util.HiddenFileFilter;
+import org.apache.parquet.io.InputFile;
 
 /**
  * Read records from a Parquet file.
@@ -45,9 +51,8 @@ import org.apache.parquet.hadoop.util.HiddenFileFilter;
 public class ParquetReader<T> implements Closeable {
 
   private final ReadSupport<T> readSupport;
-  private final Configuration conf;
-  private final Iterator<Footer> footersIterator;
-  private final Filter filter;
+  private final Iterator<InputFile> filesIterator;
+  private final ParquetReadOptions options;
 
   private InternalParquetRecordReader<T> reader;
 
@@ -100,17 +105,22 @@ public class ParquetReader<T> implements Closeable {
   }
 
   private ParquetReader(Configuration conf,
-                       Path file,
-                       ReadSupport<T> readSupport,
-                       Filter filter) throws IOException {
-    this.readSupport = readSupport;
-    this.filter = checkNotNull(filter, "filter");
-    this.conf = conf;
+                        Path file,
+                        ReadSupport<T> readSupport,
+                        FilterCompat.Filter filter) throws IOException {
+    this(Collections.singletonList((InputFile) HadoopInputFile.fromPath(file, 
conf)),
+        HadoopReadOptions.builder(conf)
+            .withRecordFilter(checkNotNull(filter, "filter"))
+            .build(),
+        readSupport);
+  }
 
-    FileSystem fs = file.getFileSystem(conf);
-    List<FileStatus> statuses = Arrays.asList(fs.listStatus(file, 
HiddenFileFilter.INSTANCE));
-    List<Footer> footers = 
ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses, 
false);
-    this.footersIterator = footers.iterator();
+  private ParquetReader(List<InputFile> files,
+                        ParquetReadOptions options,
+                        ReadSupport<T> readSupport) throws IOException {
+    this.readSupport = readSupport;
+    this.options = options;
+    this.filesIterator = files.iterator();
   }
 
   /**
@@ -135,18 +145,15 @@ public class ParquetReader<T> implements Closeable {
       reader.close();
       reader = null;
     }
-    if (footersIterator.hasNext()) {
-      Footer footer = footersIterator.next();
 
-      ParquetFileReader fileReader = ParquetFileReader.open(
-          conf, footer.getFile(), footer.getParquetMetadata());
+    if (filesIterator.hasNext()) {
+      InputFile file = filesIterator.next();
 
-      // apply data filters
-      fileReader.filterRowGroups(filter);
+      ParquetFileReader fileReader = ParquetFileReader.open(file, options);
 
-      reader = new InternalParquetRecordReader<T>(readSupport, filter);
+      reader = new InternalParquetRecordReader<>(readSupport, 
options.getRecordFilter());
 
-      reader.initialize(fileReader, conf);
+      reader.initialize(fileReader, options);
     }
   }
 
@@ -157,37 +164,114 @@ public class ParquetReader<T> implements Closeable {
     }
   }
 
+  public static <T> Builder<T> read(InputFile file) throws IOException {
+    return new Builder<>(file);
+  }
+
   public static <T> Builder<T> builder(ReadSupport<T> readSupport, Path path) {
-    return new Builder<T>(readSupport, path);
+    return new Builder<>(readSupport, path);
   }
 
   public static class Builder<T> {
     private final ReadSupport<T> readSupport;
-    private final Path file;
-    private Filter filter;
-    protected Configuration conf;
+    private final InputFile file;
+    private final Path path;
+    private Filter filter = null;
+    protected Configuration conf = new Configuration();
+    private ParquetReadOptions.Builder optionsBuilder = 
HadoopReadOptions.builder(conf);
 
+    @Deprecated
     private Builder(ReadSupport<T> readSupport, Path path) {
       this.readSupport = checkNotNull(readSupport, "readSupport");
-      this.file = checkNotNull(path, "path");
-      this.conf = new Configuration();
-      this.filter = FilterCompat.NOOP;
+      this.file = null;
+      this.path = checkNotNull(path, "path");
     }
 
+    @Deprecated
     protected Builder(Path path) {
       this.readSupport = null;
-      this.file = checkNotNull(path, "path");
-      this.conf = new Configuration();
-      this.filter = FilterCompat.NOOP;
+      this.file = null;
+      this.path = checkNotNull(path, "path");
     }
 
+    protected Builder(InputFile file) {
+      this.readSupport = null;
+      this.file = checkNotNull(file, "file");
+      this.path = null;
+    }
+
+    // when called, resets options to the defaults from conf
     public Builder<T> withConf(Configuration conf) {
       this.conf = checkNotNull(conf, "conf");
+
+      // previous versions didn't use the builder, so may set filter before 
conf. this maintains
+      // compatibility for filter. other options are reset by a new conf.
+      this.optionsBuilder = HadoopReadOptions.builder(conf);
+      if (filter != null) {
+        optionsBuilder.withRecordFilter(filter);
+      }
+
       return this;
     }
 
     public Builder<T> withFilter(Filter filter) {
-      this.filter = checkNotNull(filter, "filter");
+      this.filter = filter;
+      optionsBuilder.withRecordFilter(filter);
+      return this;
+    }
+
+    public Builder<T> useSignedStringMinMax(boolean useSignedStringMinMax) {
+      optionsBuilder.useSignedStringMinMax(useSignedStringMinMax);
+      return this;
+    }
+
+    public Builder<T> useSignedStringMinMax() {
+      optionsBuilder.useSignedStringMinMax();
+      return this;
+    }
+
+    public Builder<T> useStatsFilter(boolean useStatsFilter) {
+      optionsBuilder.useStatsFilter(useStatsFilter);
+      return this;
+    }
+
+    public Builder<T> useStatsFilter() {
+      optionsBuilder.useStatsFilter();
+      return this;
+    }
+
+    public Builder<T> useDictionaryFilter(boolean useDictionaryFilter) {
+      optionsBuilder.useDictionaryFilter(useDictionaryFilter);
+      return this;
+    }
+
+    public Builder<T> useDictionaryFilter() {
+      optionsBuilder.useDictionaryFilter();
+      return this;
+    }
+
+    public Builder<T> useRecordFilter(boolean useRecordFilter) {
+      optionsBuilder.useRecordFilter(useRecordFilter);
+      return this;
+    }
+
+    public Builder<T> useRecordFilter() {
+      optionsBuilder.useRecordFilter();
+      return this;
+    }
+
+    public Builder<T> withFileRange(long start, long end) {
+      optionsBuilder.withRange(start, end);
+      return this;
+    }
+
+    public Builder<T> withCodecFactory(CompressionCodecFactory codecFactory) {
+      optionsBuilder.withCodecFactory(codecFactory);
+      return this;
+    }
+
+    public Builder<T> set(String key, String value) {
+      optionsBuilder.set(key, value);
       return this;
     }
 
@@ -199,7 +283,29 @@ public class ParquetReader<T> implements Closeable {
     }
 
     public ParquetReader<T> build() throws IOException {
-      return new ParquetReader<T>(conf, file, getReadSupport(), filter);
+      ParquetReadOptions options = optionsBuilder.build();
+
+      if (path != null) {
+        FileSystem fs = path.getFileSystem(conf);
+        FileStatus stat = fs.getFileStatus(path);
+
+        if (stat.isFile()) {
+          return new ParquetReader<>(
+              Collections.singletonList((InputFile) 
HadoopInputFile.fromStatus(stat, conf)),
+              options,
+              getReadSupport());
+
+        } else {
+          List<InputFile> files = new ArrayList<>();
+          for (FileStatus fileStatus : fs.listStatus(path, 
HiddenFileFilter.INSTANCE)) {
+            files.add(HadoopInputFile.fromStatus(fileStatus, conf));
+          }
+          return new ParquetReader<T>(files, options, getReadSupport());
+        }
+
+      } else {
+        return new ParquetReader<>(Collections.singletonList(file), options, 
getReadSupport());
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
index ebdc686..9ca8be9 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
@@ -18,14 +18,9 @@
  */
 package org.apache.parquet.hadoop;
 
-import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.*;
-import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.offsets;
-import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.range;
 import static org.apache.parquet.hadoop.ParquetInputFormat.SPLIT_FILES;
-import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -37,21 +32,21 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.parquet.CorruptDeltaByteArrays;
+import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.FilterCompat.Filter;
-import org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel;
-import 
org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
 import org.apache.parquet.hadoop.api.ReadSupport;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.HadoopReadOptions;
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.slf4j.Logger;
@@ -158,13 +153,16 @@ public class ParquetRecordReader<T> extends 
RecordReader<Void, T> {
     long[] rowGroupOffsets = split.getRowGroupOffsets();
 
     // if task.side.metadata is set, rowGroupOffsets is null
-    MetadataFilter metadataFilter = (rowGroupOffsets != null ?
-        offsets(rowGroupOffsets) :
-        range(split.getStart(), split.getEnd()));
+    ParquetReadOptions.Builder optionsBuilder = 
HadoopReadOptions.builder(configuration);
+    if (rowGroupOffsets != null) {
+      optionsBuilder.withOffsets(rowGroupOffsets);
+    } else {
+      optionsBuilder.withRange(split.getStart(), split.getEnd());
+    }
 
     // open a reader with the metadata filter
     ParquetFileReader reader = ParquetFileReader.open(
-        configuration, path, metadataFilter);
+        HadoopInputFile.fromPath(path, configuration), optionsBuilder.build());
 
     if (rowGroupOffsets != null) {
       // verify a row group was found for each offset
@@ -175,10 +173,6 @@ public class ParquetRecordReader<T> extends 
RecordReader<Void, T> {
             + " expected: " + Arrays.toString(rowGroupOffsets)
             + " found: " + blocks);
       }
-
-    } else {
-      // apply data filters
-      reader.filterRowGroups(getFilter(configuration));
     }
 
     if (!reader.getRowGroups().isEmpty()) {

Reply via email to