ORC-101 Correct bloom filters for strings and decimals to use utf8 encoding.


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/9d39cb80
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/9d39cb80
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/9d39cb80

Branch: refs/heads/master
Commit: 9d39cb80f455f7c341bd4a9421651badb1d137f3
Parents: 7118e96
Author: Owen O'Malley <omal...@apache.org>
Authored: Tue Sep 13 13:28:44 2016 -0700
Committer: Owen O'Malley <omal...@apache.org>
Committed: Tue Sep 20 15:12:57 2016 -0500

----------------------------------------------------------------------
 c++/include/orc/Reader.hh                       |   1 +
 c++/src/Reader.cc                               |   2 +
 .../src/java/org/apache/orc/BloomFilterIO.java  |  50 --
 .../src/java/org/apache/orc/DataReader.java     |   7 +-
 java/core/src/java/org/apache/orc/OrcConf.java  |  10 +
 java/core/src/java/org/apache/orc/OrcFile.java  |  51 +-
 .../java/org/apache/orc/TypeDescription.java    |  26 +
 .../orc/impl/ConvertTreeReaderFactory.java      |  12 +-
 .../src/java/org/apache/orc/impl/OrcIndex.java  |  10 +-
 .../org/apache/orc/impl/RecordReaderImpl.java   |  70 ++-
 .../org/apache/orc/impl/RecordReaderUtils.java  | 192 ++++++--
 .../org/apache/orc/impl/SchemaEvolution.java    |   4 +
 .../java/org/apache/orc/impl/StreamName.java    |   1 +
 .../java/org/apache/orc/impl/WriterImpl.java    | 228 +++++++--
 .../java/org/apache/orc/util/BloomFilter.java   | 312 ++++++++++++
 .../java/org/apache/orc/util/BloomFilterIO.java |  93 ++++
 .../org/apache/orc/util/BloomFilterUtf8.java    |  55 +++
 .../test/org/apache/orc/TestVectorOrcFile.java  |   4 +-
 .../apache/orc/impl/TestRecordReaderImpl.java   | 484 +++++++++++++------
 .../test/org/apache/orc/util/TestMurmur3.java   | 225 +++++++++
 java/core/src/test/resources/log4j.properties   |   3 +
 .../src/test/resources/log4j.properties         |   3 +
 .../apache/hive/common/util/BloomFilter.java    | 313 ------------
 .../org/apache/hive/common/util/Murmur3.java    | 335 -------------
 .../src/java/org/apache/orc/util/Murmur3.java   | 335 +++++++++++++
 .../apache/hive/common/util/TestMurmur3.java    | 224 ---------
 .../src/java/org/apache/orc/tools/FileDump.java |  20 +-
 .../java/org/apache/orc/tools/JsonFileDump.java |  27 +-
 .../test/org/apache/orc/tools/TestFileDump.java |   6 +-
 java/tools/src/test/resources/log4j.properties  |  21 +
 .../resources/orc-file-dump-bloomfilter.out     | 106 ++--
 .../resources/orc-file-dump-bloomfilter2.out    | 121 +++--
 .../orc-file-dump-dictionary-threshold.out      |   2 +-
 .../tools/src/test/resources/orc-file-dump.json | 150 +++---
 java/tools/src/test/resources/orc-file-dump.out |   2 +-
 .../src/test/resources/orc-file-has-null.out    |   2 +-
 proto/orc_proto.proto                           |   2 +
 37 files changed, 2115 insertions(+), 1394 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/9d39cb80/c++/include/orc/Reader.hh
----------------------------------------------------------------------
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index 25a0a17..eacbd80 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -53,6 +53,7 @@ namespace orc {
     WriterVersion_HIVE_4243 = 2,
     WriterVersion_HIVE_12055 = 3,
     WriterVersion_HIVE_13083 = 4,
+    WriterVersion_ORC_101 = 5,
     WriterVersion_MAX = INT64_MAX
   };
 

http://git-wip-us.apache.org/repos/asf/orc/blob/9d39cb80/c++/src/Reader.cc
----------------------------------------------------------------------
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 9b1f1b9..91f4ea1 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -72,6 +72,8 @@ namespace orc {
       return "HIVE-12055";
     case WriterVersion_HIVE_13083:
       return "HIVE-13083";
+    case WriterVersion_ORC_101:
+      return "ORC-101";
     }
     std::stringstream buffer;
     buffer << "future - " << version;

http://git-wip-us.apache.org/repos/asf/orc/blob/9d39cb80/java/core/src/java/org/apache/orc/BloomFilterIO.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/BloomFilterIO.java 
b/java/core/src/java/org/apache/orc/BloomFilterIO.java
deleted file mode 100644
index 106227d..0000000
--- a/java/core/src/java/org/apache/orc/BloomFilterIO.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc;
-
-import org.apache.hive.common.util.BloomFilter;
-
-public class BloomFilterIO extends BloomFilter {
-
-  public BloomFilterIO(long expectedEntries) {
-    super(expectedEntries, DEFAULT_FPP);
-  }
-
-  public BloomFilterIO(long expectedEntries, double fpp) {
-    super(expectedEntries, fpp);
-  }
-
-  static long[] toArray(OrcProto.BloomFilter filter) {
-    long[] result = new long[filter.getBitsetCount()];
-    int i =0;
-    for(Long l: filter.getBitsetList()) {
-      result[i++] = l;
-    }
-    return result;
-  }
-
-/**
- * Initializes the BloomFilter from the given Orc BloomFilter
- */
-  public BloomFilterIO(OrcProto.BloomFilter bloomFilter) {
-    this.bitSet = new BitSet(toArray(bloomFilter));
-    this.numHashFunctions = bloomFilter.getNumHashFunctions();
-    this.numBits = (int) this.bitSet.bitSize();
-  }
-}

http://git-wip-us.apache.org/repos/asf/orc/blob/9d39cb80/java/core/src/java/org/apache/orc/DataReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/DataReader.java 
b/java/core/src/java/org/apache/orc/DataReader.java
index a5dbb76..b3f91f2 100644
--- a/java/core/src/java/org/apache/orc/DataReader.java
+++ b/java/core/src/java/org/apache/orc/DataReader.java
@@ -31,9 +31,14 @@ public interface DataReader extends AutoCloseable {
   void open() throws IOException;
 
   OrcIndex readRowIndex(StripeInformation stripe,
+                        TypeDescription fileSchema,
                         OrcProto.StripeFooter footer,
-                        boolean[] included, OrcProto.RowIndex[] indexes,
+                        boolean ignoreNonUtf8BloomFilter,
+                        boolean[] included,
+                        OrcProto.RowIndex[] indexes,
                         boolean[] sargColumns,
+                        OrcFile.WriterVersion version,
+                        OrcProto.Stream.Kind[] bloomFilterKinds,
                         OrcProto.BloomFilterIndex[] bloomFilterIndices
                         ) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/orc/blob/9d39cb80/java/core/src/java/org/apache/orc/OrcConf.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/OrcConf.java 
b/java/core/src/java/org/apache/orc/OrcConf.java
index ac8e3f0..05ab13b 100644
--- a/java/core/src/java/org/apache/orc/OrcConf.java
+++ b/java/core/src/java/org/apache/orc/OrcConf.java
@@ -105,6 +105,16 @@ public enum OrcConf {
           "dictionary or not will be retained thereafter."),
   BLOOM_FILTER_COLUMNS("orc.bloom.filter.columns", "orc.bloom.filter.columns",
       "", "List of columns to create bloom filters for when writing."),
+  BLOOM_FILTER_WRITE_VERSION("orc.bloom.filter.write.version",
+      "orc.bloom.filter.write.version", 
OrcFile.BloomFilterVersion.UTF8.toString(),
+      "Which version of the bloom filters should we write.\n" +
+          "The choices are:\n" +
+          "  original - writes two versions of the bloom filters for use by\n" 
+
+          "             both old and new readers.\n" +
+          "  utf8 - writes just the new bloom filters."),
+  IGNORE_NON_UTF8_BLOOM_FILTERS("orc.bloom.filter.ignore.non-utf8",
+      "orc.bloom.filter.ignore.non-utf8", false,
+      "Should the reader ignore the obsolete non-UTF8 bloom filters."),
   MAX_FILE_LENGTH("orc.max.file.length", "orc.max.file.length", Long.MAX_VALUE,
       "The maximum size of the file to read for finding the file tail. This\n" 
+
           "is primarily used for streaming ingest to read intermediate\n" +

http://git-wip-us.apache.org/repos/asf/orc/blob/9d39cb80/java/core/src/java/org/apache/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/OrcFile.java 
b/java/core/src/java/org/apache/orc/OrcFile.java
index ddfa9f7..6b2d48e 100644
--- a/java/core/src/java/org/apache/orc/OrcFile.java
+++ b/java/core/src/java/org/apache/orc/OrcFile.java
@@ -108,6 +108,7 @@ public class OrcFile {
     HIVE_4243(2), // use real column names from Hive tables
     HIVE_12055(3), // vectorized writer
     HIVE_13083(4), // decimal writer updating present stream wrongly
+    ORC_101(5),    // bloom filters use utf8
 
     // Don't use any magic numbers here except for the below:
     FUTURE(Integer.MAX_VALUE); // a version from a future writer
@@ -144,8 +145,12 @@ public class OrcFile {
       if (val == FUTURE.id) return FUTURE; // Special handling for the magic 
value.
       return values[val];
     }
+
+    public boolean includes(WriterVersion other) {
+      return id >= other.id;
+    }
   }
-  public static final WriterVersion CURRENT_WRITER = WriterVersion.HIVE_13083;
+  public static final WriterVersion CURRENT_WRITER = WriterVersion.ORC_101;
 
   public enum EncodingStrategy {
     SPEED, COMPRESSION
@@ -231,6 +236,33 @@ public class OrcFile {
     void preFooterWrite(WriterContext context) throws IOException;
   }
 
+  public static enum BloomFilterVersion {
+    // Include both the BLOOM_FILTER and BLOOM_FILTER_UTF8 streams to support
+    // both old and new readers.
+    ORIGINAL("original"),
+    // Only include the BLOOM_FILTER_UTF8 streams that consistently use UTF8.
+    // See ORC-101
+    UTF8("utf8");
+
+    private final String id;
+    private BloomFilterVersion(String id) {
+      this.id = id;
+    }
+
+    public String toString() {
+      return id;
+    }
+
+    public static BloomFilterVersion fromString(String s) {
+      for (BloomFilterVersion version: values()) {
+        if (version.id.equals(s)) {
+          return version;
+        }
+      }
+      throw new IllegalArgumentException("Unknown BloomFilterVersion " + s);
+    }
+  }
+
   /**
    * Options for creating ORC file writers.
    */
@@ -253,6 +285,7 @@ public class OrcFile {
     private double paddingTolerance;
     private String bloomFilterColumns;
     private double bloomFilterFpp;
+    private BloomFilterVersion bloomFilterVersion;
 
     protected WriterOptions(Properties tableProperties, Configuration conf) {
       configuration = conf;
@@ -286,6 +319,10 @@ public class OrcFile {
           conf);
       bloomFilterFpp = OrcConf.BLOOM_FILTER_FPP.getDouble(tableProperties,
           conf);
+      bloomFilterVersion =
+          BloomFilterVersion.fromString(
+              OrcConf.BLOOM_FILTER_WRITE_VERSION.getString(tableProperties,
+                  conf));
     }
 
     /**
@@ -430,6 +467,14 @@ public class OrcFile {
     }
 
     /**
+     * Set the version of the bloom filters to write.
+     */
+    public WriterOptions bloomFilterVersion(BloomFilterVersion version) {
+      this.bloomFilterVersion = version;
+      return this;
+    }
+
+    /**
      * A package local option to set the memory manager.
      */
     protected WriterOptions memory(MemoryManager value) {
@@ -508,6 +553,10 @@ public class OrcFile {
     public double getBloomFilterFpp() {
       return bloomFilterFpp;
     }
+
+    public BloomFilterVersion getBloomFilterVersion() {
+      return bloomFilterVersion;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/orc/blob/9d39cb80/java/core/src/java/org/apache/orc/TypeDescription.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/TypeDescription.java 
b/java/core/src/java/org/apache/orc/TypeDescription.java
index da9fe49..bc6787d 100644
--- a/java/core/src/java/org/apache/orc/TypeDescription.java
+++ b/java/core/src/java/org/apache/orc/TypeDescription.java
@@ -842,4 +842,30 @@ public class TypeDescription
     printJsonToBuffer("", buffer, 0);
     return buffer.toString();
   }
+
+  /**
+   * Locate a subtype by its id.
+   * @param goal the column id to look for
+   * @return the subtype
+   */
+  public TypeDescription findSubtype(int goal) {
+    // call getId method to make sure the ids are assigned
+    int id = getId();
+    if (goal < id || goal > maxId) {
+      throw new IllegalArgumentException("Unknown type id " + id + " in " +
+          toJson());
+    }
+    if (goal == id) {
+      return this;
+    } else {
+      TypeDescription prev = null;
+      for(TypeDescription next: children) {
+        if (next.id > goal) {
+          return prev.findSubtype(goal);
+        }
+        prev = next;
+      }
+      return prev.findSubtype(goal);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/9d39cb80/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
----------------------------------------------------------------------
diff --git 
a/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java 
b/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
index 36b9a20..20e0faa 100644
--- a/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
@@ -1408,7 +1408,7 @@ public class ConvertTreeReaderFactory extends 
TreeReaderFactory {
     public void setConvertVectorElement(int elementNum) {
       long longValue = longColVector.vector[elementNum];
       String string = anyIntegerAsLongTreeReader.getString(longValue);
-      byte[] bytes = string.getBytes();
+      byte[] bytes = string.getBytes(StandardCharsets.UTF_8);
       assignStringGroupVectorEntry(bytesColVector, elementNum, readerType, 
bytes);
     }
 
@@ -1450,7 +1450,7 @@ public class ConvertTreeReaderFactory extends 
TreeReaderFactory {
       float floatValue = (float) doubleColVector.vector[elementNum];
       if (!Float.isNaN(floatValue)) {
         String string = String.valueOf(floatValue);
-        byte[] bytes = string.getBytes();
+        byte[] bytes = string.getBytes(StandardCharsets.UTF_8);
         assignStringGroupVectorEntry(bytesColVector, elementNum, readerType, 
bytes);
       } else {
         bytesColVector.noNulls = false;
@@ -1495,7 +1495,7 @@ public class ConvertTreeReaderFactory extends 
TreeReaderFactory {
       double doubleValue = doubleColVector.vector[elementNum];
       if (!Double.isNaN(doubleValue)) {
         String string = String.valueOf(doubleValue);
-        byte[] bytes = string.getBytes();
+        byte[] bytes = string.getBytes(StandardCharsets.UTF_8);
         assignStringGroupVectorEntry(bytesColVector, elementNum, readerType, 
bytes);
       } else {
         bytesColVector.noNulls = false;
@@ -1544,7 +1544,7 @@ public class ConvertTreeReaderFactory extends 
TreeReaderFactory {
     @Override
     public void setConvertVectorElement(int elementNum) {
       String string = 
decimalColVector.vector[elementNum].getHiveDecimal().toString();
-      byte[] bytes = string.getBytes();
+      byte[] bytes = string.getBytes(StandardCharsets.UTF_8);
       assignStringGroupVectorEntry(bytesColVector, elementNum, readerType, 
bytes);
     }
 
@@ -1584,7 +1584,7 @@ public class ConvertTreeReaderFactory extends 
TreeReaderFactory {
     public void setConvertVectorElement(int elementNum) throws IOException {
       String string =
           timestampColVector.asScratchTimestamp(elementNum).toString();
-      byte[] bytes = string.getBytes();
+      byte[] bytes = string.getBytes(StandardCharsets.UTF_8);
       assignStringGroupVectorEntry(bytesColVector, elementNum, readerType, 
bytes);
     }
 
@@ -1626,7 +1626,7 @@ public class ConvertTreeReaderFactory extends 
TreeReaderFactory {
     public void setConvertVectorElement(int elementNum) throws IOException {
       date.setTime(DateWritable.daysToMillis((int) 
longColVector.vector[elementNum]));
       String string = date.toString();
-      byte[] bytes = string.getBytes();
+      byte[] bytes = string.getBytes(StandardCharsets.UTF_8);
       assignStringGroupVectorEntry(bytesColVector, elementNum, readerType, 
bytes);
     }
 

http://git-wip-us.apache.org/repos/asf/orc/blob/9d39cb80/java/core/src/java/org/apache/orc/impl/OrcIndex.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OrcIndex.java 
b/java/core/src/java/org/apache/orc/impl/OrcIndex.java
index 50a15f2..edcb3ba 100644
--- a/java/core/src/java/org/apache/orc/impl/OrcIndex.java
+++ b/java/core/src/java/org/apache/orc/impl/OrcIndex.java
@@ -22,10 +22,14 @@ import org.apache.orc.OrcProto;
 
 public final class OrcIndex {
   OrcProto.RowIndex[] rowGroupIndex;
+  OrcProto.Stream.Kind[] bloomFilterKinds;
   OrcProto.BloomFilterIndex[] bloomFilterIndex;
 
-  public OrcIndex(OrcProto.RowIndex[] rgIndex, OrcProto.BloomFilterIndex[] 
bfIndex) {
+  public OrcIndex(OrcProto.RowIndex[] rgIndex,
+                  OrcProto.Stream.Kind[] bloomFilterKinds,
+                  OrcProto.BloomFilterIndex[] bfIndex) {
     this.rowGroupIndex = rgIndex;
+    this.bloomFilterKinds = bloomFilterKinds;
     this.bloomFilterIndex = bfIndex;
   }
 
@@ -37,6 +41,10 @@ public final class OrcIndex {
     return bloomFilterIndex;
   }
 
+  public OrcProto.Stream.Kind[] getBloomFilterKinds() {
+    return bloomFilterKinds;
+  }
+
   public void setRowGroupIndex(OrcProto.RowIndex[] rowGroupIndex) {
     this.rowGroupIndex = rowGroupIndex;
   }

http://git-wip-us.apache.org/repos/asf/orc/blob/9d39cb80/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java 
b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index e8ad54d..c7ce2bb 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -27,7 +27,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.orc.BloomFilterIO;
+import org.apache.orc.OrcFile;
+import org.apache.orc.util.BloomFilter;
+import org.apache.orc.util.BloomFilterIO;
 import org.apache.orc.BooleanColumnStatistics;
 import org.apache.orc.ColumnStatistics;
 import org.apache.orc.CompressionCodec;
@@ -88,10 +90,13 @@ public class RecordReaderImpl implements RecordReader {
   private final TreeReaderFactory.TreeReader reader;
   private final OrcProto.RowIndex[] indexes;
   private final OrcProto.BloomFilterIndex[] bloomFilterIndices;
+  private final OrcProto.Stream.Kind[] bloomFilterKind;
   private final SargApplier sargApp;
   // an array about which row groups aren't skipped
   private boolean[] includedRowGroups = null;
   private final DataReader dataReader;
+  private final boolean ignoreNonUtf8BloomFilter;
+  private final OrcFile.WriterVersion writerVersion;
 
   /**
    * Given a list of column names, find the given column and return the index.
@@ -134,6 +139,7 @@ public class RecordReaderImpl implements RecordReader {
   protected RecordReaderImpl(ReaderImpl fileReader,
                              Reader.Options options) throws IOException {
     this.included = options.getInclude();
+    this.writerVersion = fileReader.getWriterVersion();
     included[0] = true;
     if (options.getSchema() == null) {
       if (LOG.isInfoEnabled()) {
@@ -162,11 +168,14 @@ public class RecordReaderImpl implements RecordReader {
     this.types = fileReader.types;
     this.bufferSize = fileReader.bufferSize;
     this.rowIndexStride = fileReader.rowIndexStride;
+    this.ignoreNonUtf8BloomFilter =
+        OrcConf.IGNORE_NON_UTF8_BLOOM_FILTERS.getBoolean(fileReader.conf);
     SearchArgument sarg = options.getSearchArgument();
     if (sarg != null && rowIndexStride != 0) {
       sargApp = new SargApplier(sarg, options.getColumnNames(),
                                 rowIndexStride,
-                                included.length, evolution);
+                                included.length, evolution,
+                                writerVersion);
     } else {
       sargApp = null;
     }
@@ -218,6 +227,7 @@ public class RecordReaderImpl implements RecordReader {
     writerIncluded = evolution.getFileIncluded();
     indexes = new OrcProto.RowIndex[types.size()];
     bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
+    bloomFilterKind = new OrcProto.Stream.Kind[types.size()];
     advanceToNextRow(reader, 0L, true);
   }
 
@@ -339,20 +349,23 @@ public class RecordReaderImpl implements RecordReader {
    * that is referenced in the predicate.
    * @param statsProto the statistics for the column mentioned in the predicate
    * @param predicate the leaf predicate we need to evaluation
-   * @param bloomFilter
+   * @param bloomFilter the bloom filter
+   * @param writerVersion the version of software that wrote the file
+   * @param type what is the kind of this column
    * @return the set of truth values that may be returned for the given
    *   predicate.
    */
   static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics 
statsProto,
-      PredicateLeaf predicate, OrcProto.BloomFilter bloomFilter) {
+                                           PredicateLeaf predicate,
+                                           OrcProto.Stream.Kind kind,
+                                           OrcProto.BloomFilter bloomFilter,
+                                           OrcFile.WriterVersion writerVersion,
+                                           TypeDescription.Category type) {
     ColumnStatistics cs = ColumnStatisticsImpl.deserialize(statsProto);
     Object minValue = getMin(cs);
     Object maxValue = getMax(cs);
-    BloomFilterIO bf = null;
-    if (bloomFilter != null) {
-      bf = new BloomFilterIO(bloomFilter);
-    }
-    return evaluatePredicateRange(predicate, minValue, maxValue, cs.hasNull(), 
bf);
+    return evaluatePredicateRange(predicate, minValue, maxValue, cs.hasNull(),
+        BloomFilterIO.deserialize(kind, writerVersion, type, bloomFilter));
   }
 
   /**
@@ -365,14 +378,14 @@ public class RecordReaderImpl implements RecordReader {
    */
   public static TruthValue evaluatePredicate(ColumnStatistics stats,
                                              PredicateLeaf predicate,
-                                             BloomFilterIO bloomFilter) {
+                                             BloomFilter bloomFilter) {
     Object minValue = getMin(stats);
     Object maxValue = getMax(stats);
     return evaluatePredicateRange(predicate, minValue, maxValue, 
stats.hasNull(), bloomFilter);
   }
 
   static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min,
-      Object max, boolean hasNull, BloomFilterIO bloomFilter) {
+      Object max, boolean hasNull, BloomFilter bloomFilter) {
     // if we didn't have any values, everything must have been null
     if (min == null) {
       if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) {
@@ -421,7 +434,7 @@ public class RecordReaderImpl implements RecordReader {
   }
 
   private static boolean shouldEvaluateBloomFilter(PredicateLeaf predicate,
-      TruthValue result, BloomFilterIO bloomFilter) {
+      TruthValue result, BloomFilter bloomFilter) {
     // evaluate bloom filter only when
     // 1) Bloom filter is available
     // 2) Min/Max evaluation yield YES or MAYBE
@@ -531,7 +544,7 @@ public class RecordReaderImpl implements RecordReader {
   }
 
   private static TruthValue evaluatePredicateBloomFilter(PredicateLeaf 
predicate,
-      final Object predObj, BloomFilterIO bloomFilter, boolean hasNull) {
+      final Object predObj, BloomFilter bloomFilter, boolean hasNull) {
     switch (predicate.getOperator()) {
       case NULL_SAFE_EQUALS:
         // null safe equals does not return *_NULL variant. So set hasNull to 
false
@@ -553,7 +566,7 @@ public class RecordReaderImpl implements RecordReader {
     }
   }
 
-  private static TruthValue checkInBloomFilter(BloomFilterIO bf, Object 
predObj, boolean hasNull) {
+  private static TruthValue checkInBloomFilter(BloomFilter bf, Object predObj, 
boolean hasNull) {
     TruthValue result = hasNull ? TruthValue.NO_NULL : TruthValue.NO;
 
     if (predObj instanceof Long) {
@@ -708,6 +721,7 @@ public class RecordReaderImpl implements RecordReader {
     public final static boolean[] READ_ALL_RGS = null;
     public final static boolean[] READ_NO_RGS = new boolean[0];
 
+    private final OrcFile.WriterVersion writerVersion;
     private final SearchArgument sarg;
     private final List<PredicateLeaf> sargLeaves;
     private final int[] filterColumns;
@@ -716,10 +730,13 @@ public class RecordReaderImpl implements RecordReader {
     private final boolean[] sargColumns;
     private SchemaEvolution evolution;
 
-    public SargApplier(SearchArgument sarg, String[] columnNames,
+    public SargApplier(SearchArgument sarg,
+                       String[] columnNames,
                        long rowIndexStride,
                        int includedCount,
-                       SchemaEvolution evolution) {
+                       SchemaEvolution evolution,
+                       OrcFile.WriterVersion writerVersion) {
+      this.writerVersion = writerVersion;
       this.sarg = sarg;
       sargLeaves = sarg.getLeaves();
       filterColumns = mapSargColumnsToOrcInternalColIdx(sargLeaves,
@@ -745,8 +762,11 @@ public class RecordReaderImpl implements RecordReader {
      * row groups must be read.
      * @throws IOException
      */
-    public boolean[] pickRowGroups(StripeInformation stripe, 
OrcProto.RowIndex[] indexes,
-        OrcProto.BloomFilterIndex[] bloomFilterIndices, boolean returnNone) 
throws IOException {
+    public boolean[] pickRowGroups(StripeInformation stripe,
+                                   OrcProto.RowIndex[] indexes,
+                                   OrcProto.Stream.Kind[] bloomFilterKinds,
+                                   OrcProto.BloomFilterIndex[] 
bloomFilterIndices,
+                                   boolean returnNone) throws IOException {
       long rowsInStripe = stripe.getNumberOfRows();
       int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / 
rowIndexStride);
       boolean[] result = new boolean[groupsInStripe]; // TODO: avoid alloc?
@@ -765,11 +785,15 @@ public class RecordReaderImpl implements RecordReader {
             }
             OrcProto.ColumnStatistics stats = entry.getStatistics();
             OrcProto.BloomFilter bf = null;
+            OrcProto.Stream.Kind bfk = null;
             if (bloomFilterIndices != null && bloomFilterIndices[columnIx] != 
null) {
+              bfk = bloomFilterKinds[columnIx];
               bf = bloomFilterIndices[columnIx].getBloomFilter(rowGroup);
             }
             if (evolution != null && evolution.isPPDSafeConversion(columnIx)) {
-              leafValues[pred] = evaluatePredicateProto(stats, 
sargLeaves.get(pred), bf);
+              leafValues[pred] = evaluatePredicateProto(stats,
+                  sargLeaves.get(pred), bfk, bf, writerVersion,
+                  
evolution.getFileSchema().findSubtype(columnIx).getCategory());
             } else {
               leafValues[pred] = TruthValue.YES_NO_NULL;
             }
@@ -809,7 +833,8 @@ public class RecordReaderImpl implements RecordReader {
       return null;
     }
     readRowIndex(currentStripe, writerIncluded, sargApp.sargColumns);
-    return sargApp.pickRowGroups(stripes.get(currentStripe), indexes, 
bloomFilterIndices, false);
+    return sargApp.pickRowGroups(stripes.get(currentStripe), indexes,
+        bloomFilterKind, bloomFilterIndices, false);
   }
 
   private void clearStreams() {
@@ -1168,8 +1193,9 @@ public class RecordReaderImpl implements RecordReader {
       sargColumns = sargColumns == null ?
           (sargApp == null ? null : sargApp.sargColumns) : sargColumns;
     }
-    return dataReader.readRowIndex(stripe, stripeFooter, included, indexes, 
sargColumns,
-        bloomFilterIndex);
+    return dataReader.readRowIndex(stripe, evolution.getFileType(0), 
stripeFooter,
+        ignoreNonUtf8BloomFilter, included, indexes, sargColumns, 
writerVersion,
+        bloomFilterKind, bloomFilterIndex);
   }
 
   private void seekToRowEntry(TreeReaderFactory.TreeReader reader, int 
rowEntry)

http://git-wip-us.apache.org/repos/asf/orc/blob/9d39cb80/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java 
b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index 3d57732..cadee35 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -30,13 +30,13 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
-import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
-import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.DataReader;
+import org.apache.orc.OrcFile;
 import org.apache.orc.OrcProto;
 
 import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
 
 /**
  * Stateless methods shared between RecordReaderImpl and EncodedReaderImpl.
@@ -44,6 +44,100 @@ import org.apache.orc.StripeInformation;
 public class RecordReaderUtils {
   private static final HadoopShims SHIMS = HadoopShims.Factory.get();
 
+  static boolean hadBadBloomFilters(TypeDescription.Category category,
+                                    OrcFile.WriterVersion version) {
+    switch(category) {
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        return !version.includes(OrcFile.WriterVersion.HIVE_12055);
+      case DECIMAL:
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  /**
+   * Plans the list of disk ranges that the given stripe needs to read the
+   * indexes. All of the positions are relative to the start of the stripe.
+   * @param  fileSchema the schema for the file
+   * @param footer the stripe footer
+   * @param ignoreNonUtf8BloomFilter should the reader ignore non-utf8
+   *                                 encoded bloom filters
+   * @param fileIncluded the columns (indexed by file columns) that should be
+   *                     read
+   * @param sargColumns true for the columns (indexed by file columns) that
+   *                    we need bloom filters for
+   * @param version the version of the software that wrote the file
+   * @param bloomFilterKinds (output) the stream kind of the bloom filters
+   * @return a list of merged disk ranges to read
+   */
+  static DiskRangeList planIndexReading(TypeDescription fileSchema,
+                                        OrcProto.StripeFooter footer,
+                                        boolean ignoreNonUtf8BloomFilter,
+                                        boolean[] fileIncluded,
+                                        boolean[] sargColumns,
+                                        OrcFile.WriterVersion version,
+                                        OrcProto.Stream.Kind[] 
bloomFilterKinds) {
+    DiskRangeList.CreateHelper result = new DiskRangeList.CreateHelper();
+    List<OrcProto.Stream> streams = footer.getStreamsList();
+    // figure out which kind of bloom filter we want for each column
+    // picks bloom_filter_utf8 if its available, otherwise bloom_filter
+    if (sargColumns != null) {
+      for (OrcProto.Stream stream : streams) {
+        if (stream.hasKind() && stream.hasColumn()) {
+          int column = stream.getColumn();
+          if (sargColumns[column]) {
+            switch (stream.getKind()) {
+              case BLOOM_FILTER:
+                if (bloomFilterKinds[column] == null &&
+                    !(ignoreNonUtf8BloomFilter &&
+                        hadBadBloomFilters(fileSchema.findSubtype(column)
+                            .getCategory(), version))) {
+                  bloomFilterKinds[column] = OrcProto.Stream.Kind.BLOOM_FILTER;
+                }
+                break;
+              case BLOOM_FILTER_UTF8:
+                bloomFilterKinds[column] = 
OrcProto.Stream.Kind.BLOOM_FILTER_UTF8;
+                break;
+              default:
+                break;
+            }
+          }
+        }
+      }
+    }
+    long offset = 0;
+    for(OrcProto.Stream stream: footer.getStreamsList()) {
+      if (stream.hasKind() && stream.hasColumn()) {
+        int column = stream.getColumn();
+        if (fileIncluded == null || fileIncluded[column]) {
+          boolean needStream = false;
+          switch (stream.getKind()) {
+            case ROW_INDEX:
+              needStream = true;
+              break;
+            case BLOOM_FILTER:
+              needStream = bloomFilterKinds[column] == 
OrcProto.Stream.Kind.BLOOM_FILTER;
+              break;
+            case BLOOM_FILTER_UTF8:
+              needStream = bloomFilterKinds[column] == 
OrcProto.Stream.Kind.BLOOM_FILTER_UTF8;
+              break;
+            default:
+              // PASS
+              break;
+          }
+          if (needStream) {
+            result.addOrMerge(offset, offset + stream.getLength(), true, 
false);
+          }
+        }
+      }
+      offset += stream.getLength();
+    }
+    return result.get();
+  }
+
   private static class DefaultDataReader implements DataReader {
     private FSDataInputStream file = null;
     private final ByteBufferAllocatorPool pool;
@@ -91,10 +185,14 @@ public class RecordReaderUtils {
 
     @Override
     public OrcIndex readRowIndex(StripeInformation stripe,
+                                 TypeDescription fileSchema,
                                  OrcProto.StripeFooter footer,
+                                 boolean ignoreNonUtf8BloomFilter,
                                  boolean[] included,
                                  OrcProto.RowIndex[] indexes,
                                  boolean[] sargColumns,
+                                 OrcFile.WriterVersion version,
+                                 OrcProto.Stream.Kind[] bloomFilterKinds,
                                  OrcProto.BloomFilterIndex[] bloomFilterIndices
                                  ) throws IOException {
       if (file == null) {
@@ -106,49 +204,61 @@ public class RecordReaderUtils {
       if (indexes == null) {
         indexes = new OrcProto.RowIndex[typeCount];
       }
+      if (bloomFilterKinds == null) {
+        bloomFilterKinds = new OrcProto.Stream.Kind[typeCount];
+      }
       if (bloomFilterIndices == null) {
         bloomFilterIndices = new OrcProto.BloomFilterIndex[typeCount];
       }
-      long offset = stripe.getOffset();
-      List<OrcProto.Stream> streams = footer.getStreamsList();
-      for (int i = 0; i < streams.size(); i++) {
-        OrcProto.Stream stream = streams.get(i);
-        OrcProto.Stream nextStream = null;
-        if (i < streams.size() - 1) {
-          nextStream = streams.get(i+1);
+      DiskRangeList ranges = planIndexReading(fileSchema, footer,
+          ignoreNonUtf8BloomFilter, included, sargColumns, version,
+          bloomFilterKinds);
+      ranges = readDiskRanges(file, zcr, stripe.getOffset(), ranges, false);
+      long offset = 0;
+      DiskRangeList range = ranges;
+      for(OrcProto.Stream stream: footer.getStreamsList()) {
+        // advance to find the next range
+        while (range != null && range.getEnd() <= offset) {
+          range = range.next;
         }
-        int col = stream.getColumn();
-        int len = (int) stream.getLength();
-        // row index stream and bloom filter are interlaced, check if the sarg 
column contains bloom
-        // filter and combine the io to read row index and bloom filters for 
that column together
-        if (stream.hasKind() && (stream.getKind() == 
OrcProto.Stream.Kind.ROW_INDEX)) {
-          boolean readBloomFilter = false;
-          if (sargColumns != null && sargColumns[col] &&
-              nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) {
-            len += nextStream.getLength();
-            i += 1;
-            readBloomFilter = true;
-          }
-          if ((included == null || included[col]) && indexes[col] == null) {
-            byte[] buffer = new byte[len];
-            file.readFully(offset, buffer, 0, buffer.length);
-            ByteBuffer bb = ByteBuffer.wrap(buffer);
-            indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
-                ReaderImpl.singleton(new BufferChunk(bb, 0)), 
stream.getLength(),
-                codec, bufferSize));
-            if (readBloomFilter) {
-              bb.position((int) stream.getLength());
-              bloomFilterIndices[col] = 
OrcProto.BloomFilterIndex.parseFrom(InStream.create(
-                  "bloom_filter", ReaderImpl.singleton(new BufferChunk(bb, 0)),
-                  nextStream.getLength(), codec, bufferSize));
-            }
+        // no more ranges, so we are done
+        if (range == null) {
+          break;
+        }
+        int column = stream.getColumn();
+        if (stream.hasKind() && range.getOffset() <= offset) {
+          switch (stream.getKind()) {
+            case ROW_INDEX:
+              if (included == null || included[column]) {
+                ByteBuffer bb = range.getData().duplicate();
+                bb.position((int) (offset - range.getOffset()));
+                bb.limit((int) (bb.position() + stream.getLength()));
+                indexes[column] = OrcProto.RowIndex.parseFrom(
+                    InStream.createCodedInputStream("index",
+                        ReaderImpl.singleton(new BufferChunk(bb, 0)),
+                        stream.getLength(),
+                    codec, bufferSize));
+              }
+              break;
+            case BLOOM_FILTER:
+            case BLOOM_FILTER_UTF8:
+              if (sargColumns != null && sargColumns[column]) {
+                ByteBuffer bb = range.getData().duplicate();
+                bb.position((int) (offset - range.getOffset()));
+                bb.limit((int) (bb.position() + stream.getLength()));
+                bloomFilterIndices[column] = 
OrcProto.BloomFilterIndex.parseFrom
+                    (InStream.createCodedInputStream("bloom_filter",
+                        ReaderImpl.singleton(new BufferChunk(bb, 0)),
+                    stream.getLength(), codec, bufferSize));
+              }
+              break;
+            default:
+              break;
           }
         }
-        offset += len;
+        offset += stream.getLength();
       }
-
-      OrcIndex index = new OrcIndex(indexes, bloomFilterIndices);
-      return index;
+      return new OrcIndex(indexes, bloomFilterKinds, bloomFilterIndices);
     }
 
     @Override
@@ -234,14 +344,14 @@ public class RecordReaderUtils {
   }
 
   public static void addEntireStreamToRanges(
-      long offset, long length, CreateHelper list, boolean doMergeBuffers) {
+      long offset, long length, DiskRangeList.CreateHelper list, boolean 
doMergeBuffers) {
     list.addOrMerge(offset, offset + length, doMergeBuffers, false);
   }
 
   public static void addRgFilteredStreamToRanges(OrcProto.Stream stream,
       boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex 
index,
       OrcProto.ColumnEncoding encoding, OrcProto.Type type, int 
compressionSize, boolean hasNull,
-      long offset, long length, CreateHelper list, boolean doMergeBuffers) {
+      long offset, long length, DiskRangeList.CreateHelper list, boolean 
doMergeBuffers) {
     for (int group = 0; group < includedRowGroups.length; ++group) {
       if (!includedRowGroups[group]) continue;
       int posn = getIndexPosition(
@@ -399,7 +509,7 @@ public class RecordReaderUtils {
     if (range == null) return null;
     DiskRangeList prev = range.prev;
     if (prev == null) {
-      prev = new MutateHelper(range);
+      prev = new DiskRangeList.MutateHelper(range);
     }
     while (range != null) {
       if (range.hasData()) {

http://git-wip-us.apache.org/repos/asf/orc/blob/9d39cb80/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java 
b/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
index 1e11728..20adfd8 100644
--- a/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
+++ b/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
@@ -153,6 +153,10 @@ public class SchemaEvolution {
     return hasConversion;
   }
 
+  public TypeDescription getFileSchema() {
+    return fileSchema;
+  }
+
   public TypeDescription getFileType(TypeDescription readerType) {
     return getFileType(readerType.getId());
   }

http://git-wip-us.apache.org/repos/asf/orc/blob/9d39cb80/java/core/src/java/org/apache/orc/impl/StreamName.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/StreamName.java 
b/java/core/src/java/org/apache/orc/impl/StreamName.java
index b3fd145..e3561bf 100644
--- a/java/core/src/java/org/apache/orc/impl/StreamName.java
+++ b/java/core/src/java/org/apache/orc/impl/StreamName.java
@@ -78,6 +78,7 @@ public class StreamName implements Comparable<StreamName> {
       case ROW_INDEX:
       case DICTIONARY_COUNT:
       case BLOOM_FILTER:
+      case BLOOM_FILTER_UTF8:
         return Area.INDEX;
       default:
         return Area.DATA;

http://git-wip-us.apache.org/repos/asf/orc/blob/9d39cb80/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java 
b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index 3df1b76..940ef59 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -21,6 +21,7 @@ package org.apache.orc.impl;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -34,12 +35,10 @@ import io.airlift.compress.lz4.Lz4Compressor;
 import io.airlift.compress.lz4.Lz4Decompressor;
 import io.airlift.compress.lzo.LzoCompressor;
 import io.airlift.compress.lzo.LzoDecompressor;
-import io.airlift.compress.snappy.SnappyCompressor;
-import io.airlift.compress.snappy.SnappyDecompressor;
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.orc.BinaryColumnStatistics;
-import org.apache.orc.BloomFilterIO;
+import org.apache.orc.util.BloomFilter;
+import org.apache.orc.util.BloomFilterIO;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.CompressionKind;
 import org.apache.orc.OrcConf;
@@ -50,6 +49,7 @@ import org.apache.orc.StringColumnStatistics;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
+import org.apache.orc.util.BloomFilterUtf8;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -147,6 +147,7 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
   private final OrcFile.CompressionStrategy compressionStrategy;
   private final boolean[] bloomFilterColumns;
   private final double bloomFilterFpp;
+  private final OrcFile.BloomFilterVersion bloomFilterVersion;
   private boolean writeTimeZone;
 
   public WriterImpl(FileSystem fs,
@@ -157,6 +158,7 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     this.conf = opts.getConfiguration();
     this.callback = opts.getCallback();
     this.schema = opts.getSchema();
+    bloomFilterVersion = opts.getBloomFilterVersion();
     if (callback != null) {
       callbackContext = new OrcFile.WriterContext(){
 
@@ -426,6 +428,7 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
         case BLOOM_FILTER:
         case DATA:
         case DICTIONARY_DATA:
+        case BLOOM_FILTER_UTF8:
           if (getCompressionStrategy() == OrcFile.CompressionStrategy.SPEED) {
             modifiers = EnumSet.of(CompressionCodec.Modifier.FAST,
                 CompressionCodec.Modifier.TEXT);
@@ -543,6 +546,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     public boolean hasWriterTimeZone() {
       return writeTimeZone;
     }
+
+    public OrcFile.BloomFilterVersion getBloomFilterVersion() {
+      return bloomFilterVersion;
+    }
   }
 
   /**
@@ -564,9 +571,12 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
     private final PositionedOutputStream rowIndexStream;
     private final PositionedOutputStream bloomFilterStream;
-    protected final BloomFilterIO bloomFilter;
+    private final PositionedOutputStream bloomFilterStreamUtf8;
+    protected final BloomFilter bloomFilter;
+    protected final BloomFilterUtf8 bloomFilterUtf8;
     protected final boolean createBloomFilter;
     private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
+    private final OrcProto.BloomFilterIndex.Builder bloomFilterIndexUtf8;
     private final OrcProto.BloomFilter.Builder bloomFilterEntry;
     private boolean foundNulls;
     private OutStream isPresentOutStream;
@@ -612,15 +622,30 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
       }
       if (createBloomFilter) {
         bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
-        bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
-        bloomFilterStream = streamFactory.createStream(id, 
OrcProto.Stream.Kind.BLOOM_FILTER);
-        bloomFilter = new BloomFilterIO(streamFactory.getRowIndexStride(),
+        if (streamFactory.getBloomFilterVersion() == 
OrcFile.BloomFilterVersion.ORIGINAL) {
+          bloomFilter = new BloomFilter(streamFactory.getRowIndexStride(),
+              streamFactory.getBloomFilterFPP());
+          bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
+          bloomFilterStream = streamFactory.createStream(id,
+              OrcProto.Stream.Kind.BLOOM_FILTER);;
+        } else {
+          bloomFilter = null;
+          bloomFilterIndex = null;
+          bloomFilterStream = null;
+        }
+        bloomFilterUtf8 = new 
BloomFilterUtf8(streamFactory.getRowIndexStride(),
             streamFactory.getBloomFilterFPP());
+        bloomFilterIndexUtf8 = OrcProto.BloomFilterIndex.newBuilder();
+        bloomFilterStreamUtf8 = streamFactory.createStream(id,
+              OrcProto.Stream.Kind.BLOOM_FILTER_UTF8);;
       } else {
         bloomFilterEntry = null;
         bloomFilterIndex = null;
+        bloomFilterIndexUtf8 = null;
+        bloomFilterStreamUtf8 = null;
         bloomFilterStream = null;
         bloomFilter = null;
+        bloomFilterUtf8 = null;
       }
     }
 
@@ -788,7 +813,12 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
         bloomFilterIndex.build().writeTo(bloomFilterStream);
         bloomFilterStream.flush();
         bloomFilterIndex.clear();
-        bloomFilterEntry.clear();
+      }
+      // write the bloom filter to out stream
+      if (bloomFilterStreamUtf8 != null) {
+        bloomFilterIndexUtf8.build().writeTo(bloomFilterStreamUtf8);
+        bloomFilterStreamUtf8.flush();
+        bloomFilterIndexUtf8.clear();
       }
     }
 
@@ -837,12 +867,16 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
 
     void addBloomFilterEntry() {
       if (createBloomFilter) {
-        
bloomFilterEntry.setNumHashFunctions(bloomFilter.getNumHashFunctions());
-        bloomFilterEntry.addAllBitset(Arrays.asList(ArrayUtils.toObject(
-            bloomFilter.getBitSet())));
-        bloomFilterIndex.addBloomFilter(bloomFilterEntry.build());
-        bloomFilter.reset();
-        bloomFilterEntry.clear();
+        if (bloomFilter != null) {
+          BloomFilterIO.serialize(bloomFilterEntry, bloomFilter);
+          bloomFilterIndex.addBloomFilter(bloomFilterEntry.build());
+          bloomFilter.reset();
+        }
+        if (bloomFilterUtf8 != null) {
+          BloomFilterIO.serialize(bloomFilterEntry, bloomFilterUtf8);
+          bloomFilterIndexUtf8.addBloomFilter(bloomFilterEntry.build());
+          bloomFilterUtf8.reset();
+        }
       }
     }
 
@@ -946,7 +980,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
           byte value = (byte) vec.vector[0];
           indexStatistics.updateInteger(value, length);
           if (createBloomFilter) {
-            bloomFilter.addLong(value);
+            if (bloomFilter != null) {
+              bloomFilter.addLong(value);
+            }
+            bloomFilterUtf8.addLong(value);
           }
           for(int i=0; i < length; ++i) {
             writer.write(value);
@@ -959,7 +996,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
             writer.write(value);
             indexStatistics.updateInteger(value, 1);
             if (createBloomFilter) {
-              bloomFilter.addLong(value);
+              if (bloomFilter != null) {
+                bloomFilter.addLong(value);
+              }
+              bloomFilterUtf8.addLong(value);
             }
           }
         }
@@ -1017,7 +1057,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
           long value = vec.vector[0];
           indexStatistics.updateInteger(value, length);
           if (createBloomFilter) {
-            bloomFilter.addLong(value);
+            if (bloomFilter != null) {
+              bloomFilter.addLong(value);
+            }
+            bloomFilterUtf8.addLong(value);
           }
           for(int i=0; i < length; ++i) {
             writer.write(value);
@@ -1030,7 +1073,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
             writer.write(value);
             indexStatistics.updateInteger(value, 1);
             if (createBloomFilter) {
-              bloomFilter.addLong(value);
+              if (bloomFilter != null) {
+                bloomFilter.addLong(value);
+              }
+              bloomFilterUtf8.addLong(value);
             }
           }
         }
@@ -1077,7 +1123,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
           float value = (float) vec.vector[0];
           indexStatistics.updateDouble(value);
           if (createBloomFilter) {
-            bloomFilter.addDouble(value);
+            if (bloomFilter != null) {
+              bloomFilter.addDouble(value);
+            }
+            bloomFilterUtf8.addDouble(value);
           }
           for(int i=0; i < length; ++i) {
             utils.writeFloat(stream, value);
@@ -1090,7 +1139,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
             utils.writeFloat(stream, value);
             indexStatistics.updateDouble(value);
             if (createBloomFilter) {
-              bloomFilter.addDouble(value);
+              if (bloomFilter != null) {
+                bloomFilter.addDouble(value);
+              }
+              bloomFilterUtf8.addDouble(value);
             }
           }
         }
@@ -1138,7 +1190,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
           double value = vec.vector[0];
           indexStatistics.updateDouble(value);
           if (createBloomFilter) {
-            bloomFilter.addDouble(value);
+            if (bloomFilter != null) {
+              bloomFilter.addDouble(value);
+            }
+            bloomFilterUtf8.addDouble(value);
           }
           for(int i=0; i < length; ++i) {
             utils.writeDouble(stream, value);
@@ -1151,7 +1206,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
             utils.writeDouble(stream, value);
             indexStatistics.updateDouble(value);
             if (createBloomFilter) {
-              bloomFilter.addDouble(value);
+              if (bloomFilter != null) {
+                bloomFilter.addDouble(value);
+              }
+              bloomFilterUtf8.addDouble(value);
             }
           }
         }
@@ -1430,7 +1488,12 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
           indexStatistics.updateString(vec.vector[0], vec.start[0],
               vec.length[0], length);
           if (createBloomFilter) {
-            bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+            if (bloomFilter != null) {
+              // translate from UTF-8 to the default charset
+              bloomFilter.addString(new String(vec.vector[0], vec.start[0],
+                  vec.length[0], StandardCharsets.UTF_8));
+            }
+            bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], 
vec.length[0]);
           }
         }
       } else {
@@ -1447,7 +1510,13 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
             indexStatistics.updateString(vec.vector[offset + i],
                 vec.start[offset + i], vec.length[offset + i], 1);
             if (createBloomFilter) {
-              bloomFilter.addBytes(vec.vector[offset + i],
+              if (bloomFilter != null) {
+                // translate from UTF-8 to the default charset
+                bloomFilter.addString(new String(vec.vector[offset + i],
+                    vec.start[offset + i], vec.length[offset + i],
+                    StandardCharsets.UTF_8));
+              }
+              bloomFilterUtf8.addBytes(vec.vector[offset + i],
                   vec.start[offset + i], vec.length[offset + i]);
             }
           }
@@ -1504,7 +1573,12 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
           }
           indexStatistics.updateString(ptr, ptrOffset, itemLength, length);
           if (createBloomFilter) {
-            bloomFilter.addBytes(ptr, ptrOffset, itemLength);
+            if (bloomFilter != null) {
+              // translate from UTF-8 to the default charset
+              bloomFilter.addString(new String(vec.vector[0], vec.start[0],
+                  vec.length[0], StandardCharsets.UTF_8));
+            }
+            bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], 
vec.length[0]);
           }
         }
       } else {
@@ -1531,7 +1605,14 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
             }
             indexStatistics.updateString(ptr, ptrOffset, itemLength, 1);
             if (createBloomFilter) {
-              bloomFilter.addBytes(ptr, ptrOffset, itemLength);
+              if (bloomFilter != null) {
+                // translate from UTF-8 to the default charset
+                bloomFilter.addString(new String(vec.vector[offset + i],
+                    vec.start[offset + i], vec.length[offset + i],
+                    StandardCharsets.UTF_8));
+              }
+              bloomFilterUtf8.addBytes(vec.vector[offset + i],
+                  vec.start[offset + i], vec.length[offset + i]);
             }
           }
         }
@@ -1576,7 +1657,14 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
           indexStatistics.updateString(vec.vector[0], vec.start[0],
               itemLength, length);
           if (createBloomFilter) {
-            bloomFilter.addBytes(vec.vector[0], vec.start[0], itemLength);
+            if (bloomFilter != null) {
+              // translate from UTF-8 to the default charset
+              bloomFilter.addString(new String(vec.vector[0],
+                  vec.start[0], itemLength,
+                  StandardCharsets.UTF_8));
+            }
+            bloomFilterUtf8.addBytes(vec.vector[0],
+                vec.start[0], itemLength);
           }
         }
       } else {
@@ -1594,7 +1682,13 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
             indexStatistics.updateString(vec.vector[offset + i],
                 vec.start[offset + i], itemLength, 1);
             if (createBloomFilter) {
-              bloomFilter.addBytes(vec.vector[offset + i],
+              if (bloomFilter != null) {
+                // translate from UTF-8 to the default charset
+                bloomFilter.addString(new String(vec.vector[offset + i],
+                    vec.start[offset + i], itemLength,
+                    StandardCharsets.UTF_8));
+              }
+              bloomFilterUtf8.addBytes(vec.vector[offset + i],
                   vec.start[offset + i], itemLength);
             }
           }
@@ -1646,7 +1740,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
           indexStatistics.updateBinary(vec.vector[0], vec.start[0],
               vec.length[0], length);
           if (createBloomFilter) {
-            bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+            if (bloomFilter != null) {
+              bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+            }
+            bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], 
vec.length[0]);
           }
         }
       } else {
@@ -1658,7 +1755,11 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
             indexStatistics.updateBinary(vec.vector[offset + i],
                 vec.start[offset + i], vec.length[offset + i], 1);
             if (createBloomFilter) {
-              bloomFilter.addBytes(vec.vector[offset + i],
+              if (bloomFilter != null) {
+                bloomFilter.addBytes(vec.vector[offset + i],
+                    vec.start[offset + i], vec.length[offset + i]);
+              }
+              bloomFilterUtf8.addBytes(vec.vector[offset + i],
                   vec.start[offset + i], vec.length[offset + i]);
             }
           }
@@ -1734,7 +1835,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
           long millis = val.getTime();
           indexStatistics.updateTimestamp(millis);
           if (createBloomFilter) {
-            bloomFilter.addLong(millis);
+            if (bloomFilter != null) {
+              bloomFilter.addLong(millis);
+            }
+            bloomFilterUtf8.addLong(millis);
           }
           final long secs = millis / MILLIS_PER_SECOND - base_timestamp;
           final long nano = formatNanos(val.getNanos());
@@ -1753,7 +1857,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
             nanos.write(formatNanos(val.getNanos()));
             indexStatistics.updateTimestamp(millis);
             if (createBloomFilter) {
-              bloomFilter.addLong(millis);
+              if (bloomFilter != null) {
+                bloomFilter.addLong(millis);
+              }
+              bloomFilterUtf8.addLong(millis);
             }
           }
         }
@@ -1819,7 +1926,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
           int value = (int) vec.vector[0];
           indexStatistics.updateDate(value);
           if (createBloomFilter) {
-            bloomFilter.addLong(value);
+            if (bloomFilter != null) {
+              bloomFilter.addLong(value);
+            }
+            bloomFilterUtf8.addLong(value);
           }
           for(int i=0; i < length; ++i) {
             writer.write(value);
@@ -1832,7 +1942,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
             writer.write(value);
             indexStatistics.updateDate(value);
             if (createBloomFilter) {
-              bloomFilter.addLong(value);
+              if (bloomFilter != null) {
+                bloomFilter.addLong(value);
+              }
+              bloomFilterUtf8.addLong(value);
             }
           }
         }
@@ -1901,7 +2014,11 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
           HiveDecimal value = vec.vector[0].getHiveDecimal();
           indexStatistics.updateDecimal(value);
           if (createBloomFilter) {
-            bloomFilter.addString(value.toString());
+            String str = value.toString();
+            if (bloomFilter != null) {
+              bloomFilter.addString(str);
+            }
+            bloomFilterUtf8.addString(str);
           }
           for(int i=0; i < length; ++i) {
             SerializationUtils.writeBigInteger(valueStream,
@@ -1918,7 +2035,11 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
             scaleStream.write(value.scale());
             indexStatistics.updateDecimal(value);
             if (createBloomFilter) {
-              bloomFilter.addString(value.toString());
+              String str = value.toString();
+              if (bloomFilter != null) {
+                bloomFilter.addString(str);
+              }
+              bloomFilterUtf8.addString(str);
             }
           }
         }
@@ -2065,7 +2186,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
             childrenWriters[0].writeBatch(vec.child, childOffset, childLength);
           }
           if (createBloomFilter) {
-            bloomFilter.addLong(childLength);
+            if (bloomFilter != null) {
+              bloomFilter.addLong(childLength);
+            }
+            bloomFilterUtf8.addLong(childLength);
           }
         }
       } else {
@@ -2088,6 +2212,12 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
             } else {
               currentLength += nextLength;
             }
+            if (createBloomFilter) {
+              if (bloomFilter != null) {
+                bloomFilter.addLong(nextLength);
+              }
+              bloomFilterUtf8.addLong(nextLength);
+            }
           }
         }
         if (currentLength != 0) {
@@ -2161,7 +2291,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
             childrenWriters[1].writeBatch(vec.values, childOffset, 
childLength);
           }
           if (createBloomFilter) {
-            bloomFilter.addLong(childLength);
+            if (bloomFilter != null) {
+              bloomFilter.addLong(childLength);
+            }
+            bloomFilterUtf8.addLong(childLength);
           }
         }
       } else {
@@ -2186,6 +2319,12 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
             } else {
               currentLength += nextLength;
             }
+            if (createBloomFilter) {
+              if (bloomFilter != null) {
+                bloomFilter.addLong(nextLength);
+              }
+              bloomFilterUtf8.addLong(nextLength);
+            }
           }
         }
         if (currentLength != 0) {
@@ -2247,7 +2386,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
             tags.write(tag);
           }
           if (createBloomFilter) {
-            bloomFilter.addLong(tag);
+            if (bloomFilter != null) {
+              bloomFilter.addLong(tag);
+            }
+            bloomFilterUtf8.addLong(tag);
           }
           childrenWriters[tag].writeBatch(vec.fields[tag], offset, length);
         }
@@ -2275,6 +2417,12 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
               currentStart[tag] = i + offset;
               currentLength[tag] = 1;
             }
+            if (createBloomFilter) {
+              if (bloomFilter != null) {
+                bloomFilter.addLong(tag);
+              }
+              bloomFilterUtf8.addLong(tag);
+            }
           }
         }
         // write out any left over sequences

http://git-wip-us.apache.org/repos/asf/orc/blob/9d39cb80/java/core/src/java/org/apache/orc/util/BloomFilter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/util/BloomFilter.java 
b/java/core/src/java/org/apache/orc/util/BloomFilter.java
new file mode 100644
index 0000000..a6ff741
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/util/BloomFilter.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.util;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * BloomFilter is a probabilistic data structure for set membership check. 
BloomFilters are
+ * highly space efficient when compared to using a HashSet. Because of the 
probabilistic nature of
+ * bloom filter false positive (element not present in bloom filter but test() 
says true) are
+ * possible but false negatives are not possible (if element is present then 
test() will never
+ * say false). The false positive probability is configurable (default: 5%) 
depending on which
+ * storage requirement may increase or decrease. Lower the false positive 
probability greater
+ * is the space requirement.
+ * Bloom filters are sensitive to number of elements that will be inserted in 
the bloom filter.
+ * During the creation of bloom filter expected number of entries must be 
specified. If the number
+ * of insertions exceed the specified initial number of entries then false 
positive probability will
+ * increase accordingly.
+ *
+ * Internally, this implementation of bloom filter uses Murmur3 fast 
non-cryptographic hash
+ * algorithm. Although Murmur2 is slightly faster than Murmur3 in Java, it 
suffers from hash
+ * collisions for specific sequence of repeating bytes. Check the following 
link for more info
+ * https://code.google.com/p/smhasher/wiki/MurmurHash2Flaw
+ *
+ * Note that this class is here for backwards compatibility, because it uses
+ * the JVM default character set for strings. All new users should
+ * BloomFilterUtf8, which always uses UTF8 for the encoding.
+ */
+public class BloomFilter {
+  public static final double DEFAULT_FPP = 0.05;
+  private final BitSet bitSet;
+  private final int numBits;
+  private final int numHashFunctions;
+
+  static void checkArgument(boolean expression, String message) {
+    if (!expression) {
+      throw new IllegalArgumentException(message);
+    }
+  }
+
+  public BloomFilter(long expectedEntries) {
+    this(expectedEntries, DEFAULT_FPP);
+  }
+
+  public BloomFilter(long expectedEntries, double fpp) {
+    checkArgument(expectedEntries > 0, "expectedEntries should be > 0");
+    checkArgument(fpp > 0.0 && fpp < 1.0, "False positive probability should 
be > 0.0 & < 1.0");
+    int nb = optimalNumOfBits(expectedEntries, fpp);
+    // make 'm' multiple of 64
+    this.numBits = nb + (Long.SIZE - (nb % Long.SIZE));
+    this.numHashFunctions = optimalNumOfHashFunctions(expectedEntries, 
numBits);
+    this.bitSet = new BitSet(numBits);
+  }
+
+  /**
+   * A constructor to support rebuilding the BloomFilter from a serialized 
representation.
+   * @param bits the serialized bits
+   * @param numFuncs the number of functions used
+   */
+  public BloomFilter(long[] bits, int numFuncs) {
+    super();
+    bitSet = new BitSet(bits);
+    this.numBits = (int) bitSet.bitSize();
+    numHashFunctions = numFuncs;
+  }
+
+  static int optimalNumOfHashFunctions(long n, long m) {
+    return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
+  }
+
+  static int optimalNumOfBits(long n, double p) {
+    return (int) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
+  }
+
+  public void add(byte[] val) {
+    if (val == null) {
+      addBytes(val, -1, -1);
+    } else {
+      addBytes(val, 0, val.length);
+    }
+  }
+
+  public void addBytes(byte[] val, int offset, int length) {
+    // We use the trick mentioned in "Less Hashing, Same Performance: Building 
a Better Bloom Filter"
+    // by Kirsch et.al. From abstract 'only two hash functions are necessary 
to effectively
+    // implement a Bloom filter without any loss in the asymptotic false 
positive probability'
+
+    // Lets split up 64-bit hashcode into two 32-bit hash codes and employ the 
technique mentioned
+    // in the above paper
+    long hash64 = val == null ? Murmur3.NULL_HASHCODE :
+        Murmur3.hash64(val, offset, length);
+    addHash(hash64);
+  }
+
+  private void addHash(long hash64) {
+    int hash1 = (int) hash64;
+    int hash2 = (int) (hash64 >>> 32);
+
+    for (int i = 1; i <= numHashFunctions; i++) {
+      int combinedHash = hash1 + (i * hash2);
+      // hashcode should be positive, flip all the bits if it's negative
+      if (combinedHash < 0) {
+        combinedHash = ~combinedHash;
+      }
+      int pos = combinedHash % numBits;
+      bitSet.set(pos);
+    }
+  }
+
+  public void addString(String val) {
+    if (val == null) {
+      add(null);
+    } else {
+      add(val.getBytes(Charset.defaultCharset()));
+    }
+  }
+
+  public void addLong(long val) {
+    addHash(getLongHash(val));
+  }
+
+  public void addDouble(double val) {
+    addLong(Double.doubleToLongBits(val));
+  }
+
+  public boolean test(byte[] val) {
+    if (val == null) {
+      return testBytes(val, -1, -1);
+    }
+    return testBytes(val, 0, val.length);
+  }
+
+  public boolean testBytes(byte[] val, int offset, int length) {
+    long hash64 = val == null ? Murmur3.NULL_HASHCODE :
+        Murmur3.hash64(val, offset, length);
+    return testHash(hash64);
+  }
+
+  private boolean testHash(long hash64) {
+    int hash1 = (int) hash64;
+    int hash2 = (int) (hash64 >>> 32);
+
+    for (int i = 1; i <= numHashFunctions; i++) {
+      int combinedHash = hash1 + (i * hash2);
+      // hashcode should be positive, flip all the bits if it's negative
+      if (combinedHash < 0) {
+        combinedHash = ~combinedHash;
+      }
+      int pos = combinedHash % numBits;
+      if (!bitSet.get(pos)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public boolean testString(String val) {
+    if (val == null) {
+      return test(null);
+    } else {
+      return test(val.getBytes(Charset.defaultCharset()));
+    }
+  }
+
+  public boolean testLong(long val) {
+    return testHash(getLongHash(val));
+  }
+
+  // Thomas Wang's integer hash function
+  // 
http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm
+  private long getLongHash(long key) {
+    key = (~key) + (key << 21); // key = (key << 21) - key - 1;
+    key = key ^ (key >> 24);
+    key = (key + (key << 3)) + (key << 8); // key * 265
+    key = key ^ (key >> 14);
+    key = (key + (key << 2)) + (key << 4); // key * 21
+    key = key ^ (key >> 28);
+    key = key + (key << 31);
+    return key;
+  }
+
+  public boolean testDouble(double val) {
+    return testLong(Double.doubleToLongBits(val));
+  }
+
+  public long sizeInBytes() {
+    return getBitSize() / 8;
+  }
+
+  public int getBitSize() {
+    return bitSet.getData().length * Long.SIZE;
+  }
+
+  public int getNumHashFunctions() {
+    return numHashFunctions;
+  }
+
+  public long[] getBitSet() {
+    return bitSet.getData();
+  }
+
+  @Override
+  public String toString() {
+    return "m: " + numBits + " k: " + numHashFunctions;
+  }
+
+  /**
+   * Merge the specified bloom filter with current bloom filter.
+   *
+   * @param that - bloom filter to merge
+   */
+  public void merge(BloomFilter that) {
+    if (this != that && this.numBits == that.numBits && this.numHashFunctions 
== that.numHashFunctions) {
+      this.bitSet.putAll(that.bitSet);
+    } else {
+      throw new IllegalArgumentException("BloomFilters are not compatible for 
merging." +
+          " this - " + this.toString() + " that - " + that.toString());
+    }
+  }
+
+  public void reset() {
+    this.bitSet.clear();
+  }
+
+  /**
+   * Bare metal bit set implementation. For performance reasons, this 
implementation does not check
+   * for index bounds nor expand the bit set size if the specified index is 
greater than the size.
+   */
+  public static class BitSet {
+    private final long[] data;
+
+    public BitSet(long bits) {
+      this(new long[(int) Math.ceil((double) bits / (double) Long.SIZE)]);
+    }
+
+    /**
+     * Deserialize long array as bit set.
+     *
+     * @param data - bit array
+     */
+    public BitSet(long[] data) {
+      assert data.length > 0 : "data length is zero!";
+      this.data = data;
+    }
+
+    /**
+     * Sets the bit at specified index.
+     *
+     * @param index - position
+     */
+    public void set(int index) {
+      data[index >>> 6] |= (1L << index);
+    }
+
+    /**
+     * Returns true if the bit is set in the specified index.
+     *
+     * @param index - position
+     * @return - value at the bit position
+     */
+    public boolean get(int index) {
+      return (data[index >>> 6] & (1L << index)) != 0;
+    }
+
+    /**
+     * Number of bits
+     */
+    public long bitSize() {
+      return (long) data.length * Long.SIZE;
+    }
+
+    public long[] getData() {
+      return data;
+    }
+
+    /**
+     * Combines the two BitArrays using bitwise OR.
+     */
+    public void putAll(BitSet array) {
+      assert data.length == array.data.length :
+          "BitArrays must be of equal length (" + data.length + "!= " + 
array.data.length + ")";
+      for (int i = 0; i < data.length; i++) {
+        data[i] |= array.data[i];
+      }
+    }
+
+    /**
+     * Clear the bit set.
+     */
+    public void clear() {
+      Arrays.fill(data, 0);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/9d39cb80/java/core/src/java/org/apache/orc/util/BloomFilterIO.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/util/BloomFilterIO.java 
b/java/core/src/java/org/apache/orc/util/BloomFilterIO.java
new file mode 100644
index 0000000..ebd8c49
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/util/BloomFilterIO.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.util;
+
+import com.google.protobuf.ByteString;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+public class BloomFilterIO  {
+
+  private BloomFilterIO() {
+    // never called
+  }
+
+  /**
+   * Deserialize a bloom filter from the ORC file.
+   */
+  public static BloomFilter deserialize(OrcProto.Stream.Kind kind,
+                                        OrcFile.WriterVersion fileVersion,
+                                        TypeDescription.Category type,
+                                        OrcProto.BloomFilter bloomFilter) {
+    if (bloomFilter == null) {
+      return null;
+    }
+    int numFuncs = bloomFilter.getNumHashFunctions();
+    switch (kind) {
+      case BLOOM_FILTER: {
+        long values[] = new long[bloomFilter.getBitsetCount()];
+        for (int i = 0; i < values.length; ++i) {
+          values[i] = bloomFilter.getBitset(i);
+        }
+        // After HIVE-12055 the bloom filters for strings correctly use
+        // UTF8.
+        if (fileVersion.includes(OrcFile.WriterVersion.HIVE_12055) &&
+            (type == TypeDescription.Category.STRING ||
+             type == TypeDescription.Category.CHAR ||
+             type == TypeDescription.Category.VARCHAR)) {
+          return new BloomFilterUtf8(values, numFuncs);
+        }
+        return new BloomFilter(values, numFuncs);
+      }
+      case BLOOM_FILTER_UTF8: {
+        ByteString bits = bloomFilter.getUtf8Bitset();
+        long[] values = new long[bits.size() / 8];
+        bits.asReadOnlyByteBuffer().asLongBuffer().get(values);
+        return new BloomFilterUtf8(values, numFuncs);
+      }
+      default:
+        throw new IllegalArgumentException("Unknown bloom filter kind " + 
kind);
+    }
+  }
+
+  /**
+   * Serialize the BloomFilter to the ORC file.
+   * @param builder the builder to write to
+   * @param bloomFilter the bloom filter to serialize
+   */
+  public static void serialize(OrcProto.BloomFilter.Builder builder,
+                               BloomFilter bloomFilter) {
+    builder.clear();
+    builder.setNumHashFunctions(bloomFilter.getNumHashFunctions());
+    long[] bitset = bloomFilter.getBitSet();
+    if (bloomFilter instanceof BloomFilterUtf8) {
+      ByteBuffer buffer = ByteBuffer.allocate(bitset.length * 8);
+      buffer.asLongBuffer().put(bitset);
+      builder.setUtf8Bitset(ByteString.copyFrom(buffer));
+    } else {
+      for(int i=0; i < bitset.length; ++i) {
+        builder.addBitset(bitset[i]);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/9d39cb80/java/core/src/java/org/apache/orc/util/BloomFilterUtf8.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/util/BloomFilterUtf8.java 
b/java/core/src/java/org/apache/orc/util/BloomFilterUtf8.java
new file mode 100644
index 0000000..aad4fab
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/util/BloomFilterUtf8.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.util;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This class represents the fix from ORC-101 where we fixed the bloom filter
+ * from using the JVM's default character set to always using UTF-8.
+ */
+public class BloomFilterUtf8 extends BloomFilter {
+
+  public BloomFilterUtf8(long expectedEntries, double fpp) {
+    super(expectedEntries, fpp);
+  }
+
+  public BloomFilterUtf8(long[] bits, int numFuncs) {
+    super(bits, numFuncs);
+  }
+
+
+  public void addString(String val) {
+    if (val == null) {
+      add(null);
+    } else {
+      add(val.getBytes(StandardCharsets.UTF_8));
+    }
+  }
+
+  public boolean testString(String val) {
+    if (val == null) {
+      return test(null);
+    } else {
+      return test(val.getBytes(StandardCharsets.UTF_8));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/9d39cb80/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java 
b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
index af20d1f..5ef0ced 100644
--- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
+++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
@@ -1904,8 +1904,8 @@ public class TestVectorOrcFile {
             .withZeroCopy(false)
             .build());
     OrcIndex index =
-        meta.readRowIndex(reader.getStripes().get(0), null, null, null, null,
-            null);
+        meta.readRowIndex(reader.getStripes().get(0), null, null, false, null, 
null,
+            null, OrcFile.WriterVersion.ORC_101, null, null);
     // check the primitive columns to make sure they have the right number of
     // items in the first row group
     for(int c=1; c < 9; ++c) {

Reply via email to