HIVE-20730: Do delete event filtering even if hive.acid.index is not there 
(Saurabh Seth via Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6fbdf374
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6fbdf374
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6fbdf374

Branch: refs/heads/master
Commit: 6fbdf374888ee03ff73a4b04dfcde496f954c458
Parents: 232a028
Author: Saurabh Seth <saurabh.s...@gmail.com>
Authored: Wed Nov 14 14:57:59 2018 -0800
Committer: Eugene Koifman <ekoif...@apache.org>
Committed: Wed Nov 14 14:57:59 2018 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  2 +
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java |  8 ++-
 .../io/orc/VectorizedOrcAcidRowBatchReader.java | 54 ++++++++++++++++----
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java  |  2 +
 .../TestVectorizedOrcAcidRowBatchReader.java    | 24 +++++++--
 5 files changed, 74 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/6fbdf374/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 65264f3..6ca1315 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1905,6 +1905,8 @@ public class HiveConf extends Configuration {
     TESTMODE_BUCKET_CODEC_VERSION("hive.test.bucketcodec.version", 1,
       "For testing only.  Will make ACID subsystem write 
RecordIdentifier.bucketId in specified\n" +
         "format", false),
+    HIVETESTMODEACIDKEYIDXSKIP("hive.test.acid.key.index.skip", false, "For 
testing only. OrcRecordUpdater will skip "
+        + "generation of the hive.acid.key.index", false),
 
     HIVEMERGEMAPFILES("hive.merge.mapfiles", true,
         "Merge small files at the end of a map-only job"),

http://git-wip-us.apache.org/repos/asf/hive/blob/6fbdf374/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 4ebd69e..6d4578e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -347,7 +347,9 @@ public class OrcRecordUpdater implements RecordUpdater {
         
writerOptions.getConfiguration().set(OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getAttribute(),
 "-1.0");
       }
     }
-    writerOptions.fileSystem(fs).callback(indexBuilder);
+    if(!HiveConf.getBoolVar(options.getConfiguration(), 
HiveConf.ConfVars.HIVETESTMODEACIDKEYIDXSKIP)) {
+      writerOptions.fileSystem(fs).callback(indexBuilder);
+    }
     rowInspector = (StructObjectInspector)options.getInspector();
     
writerOptions.inspector(createEventObjectInspector(findRecId(options.getInspector(),
         options.getRecordIdColumn())));
@@ -621,6 +623,10 @@ public class OrcRecordUpdater implements RecordUpdater {
   static RecordIdentifier[] parseKeyIndex(Reader reader) {
     String[] stripes;
     try {
+      if (!reader.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)) {
+        return null;
+      }
+
       ByteBuffer val =
           reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)
               .duplicate();

http://git-wip-us.apache.org/repos/asf/hive/blob/6fbdf374/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 2f809de..6d1ca722 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -473,10 +473,16 @@ public class VectorizedOrcAcidRowBatchReader
       return new OrcRawRecordMerger.KeyInterval(null, null);
     }
     RecordIdentifier[] keyIndex = OrcRecordUpdater.parseKeyIndex(reader);
-    if(keyIndex == null || keyIndex.length != stripes.size()) {
-      LOG.warn("Could not find keyIndex or length doesn't match (" +
-          firstStripeIndex + "," + lastStripeIndex + "," + stripes.size() + 
"," +
-          (keyIndex == null ? -1 : keyIndex.length) + ")");
+
+    if(keyIndex == null) {
+      LOG.warn("Could not find keyIndex (" + firstStripeIndex + "," +
+          lastStripeIndex + "," + stripes.size() + ")");
+    }
+
+    if(keyIndex != null && keyIndex.length != stripes.size()) {
+      LOG.warn("keyIndex length doesn't match (" +
+          firstStripeIndex + "," + lastStripeIndex + "," + stripes.size() +
+          "," + keyIndex.length + ")");
       return new OrcRawRecordMerger.KeyInterval(null, null);
     }
     /**
@@ -490,17 +496,19 @@ public class VectorizedOrcAcidRowBatchReader
     if(!columnStatsPresent) {
       LOG.debug("findMinMaxKeys() No ORC column stats");
     }
+
+    List<StripeStatistics> stats = reader.getStripeStatistics();
+    assert stripes.size() == stats.size() : "str.s=" + stripes.size() +
+        " sta.s=" + stats.size();
+
     RecordIdentifier minKey = null;
-    if(firstStripeIndex > 0) {
+    if(firstStripeIndex > 0 && keyIndex != null) {
       //valid keys are strictly > than this key
       minKey = keyIndex[firstStripeIndex - 1];
       //add 1 to make comparison >= to match the case of 0th stripe
       minKey.setRowId(minKey.getRowId() + 1);
     }
     else {
-      List<StripeStatistics> stats = reader.getStripeStatistics();
-      assert stripes.size() == stats.size() : "str.s=" + stripes.size() +
-          " sta.s=" + stats.size();
       if(columnStatsPresent) {
         ColumnStatistics[] colStats =
             stats.get(firstStripeIndex).getColumnStatistics();
@@ -520,15 +528,40 @@ public class VectorizedOrcAcidRowBatchReader
         //we may want to change bucketProperty from int to long in the
         // future(across the stack) this protects the following cast to int
         assert bucketProperty.getMinimum() <= Integer.MAX_VALUE :
-            "was bucketProper changed to a long (" +
+            "was bucketProperty changed to a long (" +
                 bucketProperty.getMinimum() + ")?!:" + orcSplit;
         //this a lower bound but not necessarily greatest lower bound
         minKey = new RecordIdentifier(origWriteId.getMinimum(),
             (int) bucketProperty.getMinimum(), rowId.getMinimum());
       }
     }
+
+    RecordIdentifier maxKey = null;
+
+    if (keyIndex != null) {
+      maxKey = keyIndex[lastStripeIndex];
+    } else {
+      if(columnStatsPresent) {
+        ColumnStatistics[] colStats =
+            stats.get(lastStripeIndex).getColumnStatistics();
+        IntegerColumnStatistics origWriteId = (IntegerColumnStatistics)
+            colStats[OrcRecordUpdater.ORIGINAL_WRITEID + 1];
+        IntegerColumnStatistics bucketProperty = (IntegerColumnStatistics)
+            colStats[OrcRecordUpdater.BUCKET + 1];
+        IntegerColumnStatistics rowId = (IntegerColumnStatistics)
+            colStats[OrcRecordUpdater.ROW_ID + 1];
+
+        assert bucketProperty.getMaximum() <= Integer.MAX_VALUE :
+            "was bucketProperty changed to a long (" +
+                bucketProperty.getMaximum() + ")?!:" + orcSplit;
+
+        // this is an upper bound but not necessarily the least upper bound
+        maxKey = new RecordIdentifier(origWriteId.getMaximum(),
+            (int) bucketProperty.getMaximum(), rowId.getMaximum());
+      }
+    }
     OrcRawRecordMerger.KeyInterval keyInterval =
-        new OrcRawRecordMerger.KeyInterval(minKey, keyIndex[lastStripeIndex]);
+        new OrcRawRecordMerger.KeyInterval(minKey, maxKey);
     LOG.info("findMinMaxKeys(): " + keyInterval +
         " stripes(" + firstStripeIndex + "," + lastStripeIndex + ")");
 
@@ -551,7 +584,6 @@ public class VectorizedOrcAcidRowBatchReader
        * So use stripe stats to find proper min/max for bucketProp and rowId
        * writeId is the same in both cases
        */
-      List<StripeStatistics> stats = reader.getStripeStatistics();
       for(int i = firstStripeIndex; i <= lastStripeIndex; i++) {
         ColumnStatistics[] colStats = stats.get(firstStripeIndex)
             .getColumnStatistics();

http://git-wip-us.apache.org/repos/asf/hive/blob/6fbdf374/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java 
b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index cc29384..bc03620 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -434,6 +434,8 @@ public class TestOrcRawRecordMerger {
     Mockito.when(recordReader.next(row2)).thenReturn(row3);
     Mockito.when(recordReader.next(row3)).thenReturn(row5);
 
+    Mockito.when(reader.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME))
+        .thenReturn(true);
     Mockito.when(reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME))
         .thenReturn(ByteBuffer.wrap("10,20,30;40,50,60;40,50,61"
             .getBytes("UTF-8")));

http://git-wip-us.apache.org/repos/asf/hive/blob/6fbdf374/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
index 0b26879..99317cc 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -363,10 +363,18 @@ public class TestVectorizedOrcAcidRowBatchReader {
     HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, true);
     testDeleteEventFiltering2();
   }
+  @Test
+  public void testDeleteEventFilteringOnWithoutIdx2() throws Exception {
+    HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, true);
+    HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVETESTMODEACIDKEYIDXSKIP, 
true);
+    testDeleteEventFiltering2();
+  }
 
   private void testDeleteEventFiltering2() throws Exception {
     boolean filterOn =
         HiveConf.getBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS);
+    boolean skipKeyIdx =
+        HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.HIVETESTMODEACIDKEYIDXSKIP);
     int bucket = 1;
     AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
         .filesystem(fs)
@@ -430,10 +438,18 @@ public class TestVectorizedOrcAcidRowBatchReader {
         vectorizedReader.getKeyInterval();
     SearchArgument sarg = vectorizedReader.getDeleteEventSarg();
     if(filterOn) {
-      assertEquals(new OrcRawRecordMerger.KeyInterval(
-              new RecordIdentifier(0, bucketProperty, 0),
-              new RecordIdentifier(10000001, bucketProperty, 0)),
-          keyInterval);
+      if (skipKeyIdx) {
+        // If key index is not present, the min max key interval uses stripe 
stats instead
+        assertEquals(new OrcRawRecordMerger.KeyInterval(
+                new RecordIdentifier(0, bucketProperty, 0),
+                new RecordIdentifier(10000001, bucketProperty, 2)),
+            keyInterval);
+      } else {
+        assertEquals(new OrcRawRecordMerger.KeyInterval(
+                new RecordIdentifier(0, bucketProperty, 0),
+                new RecordIdentifier(10000001, bucketProperty, 0)),
+            keyInterval);
+      }
       //key point is that in leaf-5 is (rowId <= 2) even though maxKey has
       //rowId 0.  more in VectorizedOrcAcidRowBatchReader.findMinMaxKeys
       assertEquals( "leaf-0 = (LESS_THAN originalTransaction 0)," +

Reply via email to