HIVE-20730: Do delete event filtering even if hive.acid.index is not there (Saurabh Seth via Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6fbdf374 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6fbdf374 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6fbdf374 Branch: refs/heads/master Commit: 6fbdf374888ee03ff73a4b04dfcde496f954c458 Parents: 232a028 Author: Saurabh Seth <saurabh.s...@gmail.com> Authored: Wed Nov 14 14:57:59 2018 -0800 Committer: Eugene Koifman <ekoif...@apache.org> Committed: Wed Nov 14 14:57:59 2018 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 + .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 8 ++- .../io/orc/VectorizedOrcAcidRowBatchReader.java | 54 ++++++++++++++++---- .../hive/ql/io/orc/TestOrcRawRecordMerger.java | 2 + .../TestVectorizedOrcAcidRowBatchReader.java | 24 +++++++-- 5 files changed, 74 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/6fbdf374/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 65264f3..6ca1315 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1905,6 +1905,8 @@ public class HiveConf extends Configuration { TESTMODE_BUCKET_CODEC_VERSION("hive.test.bucketcodec.version", 1, "For testing only. Will make ACID subsystem write RecordIdentifier.bucketId in specified\n" + "format", false), + HIVETESTMODEACIDKEYIDXSKIP("hive.test.acid.key.index.skip", false, "For testing only. OrcRecordUpdater will skip " + + "generation of the hive.acid.key.index", false), HIVEMERGEMAPFILES("hive.merge.mapfiles", true, "Merge small files at the end of a map-only job"), http://git-wip-us.apache.org/repos/asf/hive/blob/6fbdf374/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 4ebd69e..6d4578e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -347,7 +347,9 @@ public class OrcRecordUpdater implements RecordUpdater { writerOptions.getConfiguration().set(OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getAttribute(), "-1.0"); } } - writerOptions.fileSystem(fs).callback(indexBuilder); + if(!HiveConf.getBoolVar(options.getConfiguration(), HiveConf.ConfVars.HIVETESTMODEACIDKEYIDXSKIP)) { + writerOptions.fileSystem(fs).callback(indexBuilder); + } rowInspector = (StructObjectInspector)options.getInspector(); writerOptions.inspector(createEventObjectInspector(findRecId(options.getInspector(), options.getRecordIdColumn()))); @@ -621,6 +623,10 @@ public class OrcRecordUpdater implements RecordUpdater { static RecordIdentifier[] parseKeyIndex(Reader reader) { String[] stripes; try { + if (!reader.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)) { + return null; + } + ByteBuffer val = reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME) .duplicate(); http://git-wip-us.apache.org/repos/asf/hive/blob/6fbdf374/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 2f809de..6d1ca722 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -473,10 +473,16 @@ public class VectorizedOrcAcidRowBatchReader return new OrcRawRecordMerger.KeyInterval(null, null); } RecordIdentifier[] keyIndex = OrcRecordUpdater.parseKeyIndex(reader); - if(keyIndex == null || keyIndex.length != stripes.size()) { - LOG.warn("Could not find keyIndex or length doesn't match (" + - firstStripeIndex + "," + lastStripeIndex + "," + stripes.size() + "," + - (keyIndex == null ? -1 : keyIndex.length) + ")"); + + if(keyIndex == null) { + LOG.warn("Could not find keyIndex (" + firstStripeIndex + "," + + lastStripeIndex + "," + stripes.size() + ")"); + } + + if(keyIndex != null && keyIndex.length != stripes.size()) { + LOG.warn("keyIndex length doesn't match (" + + firstStripeIndex + "," + lastStripeIndex + "," + stripes.size() + + "," + keyIndex.length + ")"); return new OrcRawRecordMerger.KeyInterval(null, null); } /** @@ -490,17 +496,19 @@ public class VectorizedOrcAcidRowBatchReader if(!columnStatsPresent) { LOG.debug("findMinMaxKeys() No ORC column stats"); } + + List<StripeStatistics> stats = reader.getStripeStatistics(); + assert stripes.size() == stats.size() : "str.s=" + stripes.size() + + " sta.s=" + stats.size(); + RecordIdentifier minKey = null; - if(firstStripeIndex > 0) { + if(firstStripeIndex > 0 && keyIndex != null) { //valid keys are strictly > than this key minKey = keyIndex[firstStripeIndex - 1]; //add 1 to make comparison >= to match the case of 0th stripe minKey.setRowId(minKey.getRowId() + 1); } else { - List<StripeStatistics> stats = reader.getStripeStatistics(); - assert stripes.size() == stats.size() : "str.s=" + stripes.size() + - " sta.s=" + stats.size(); if(columnStatsPresent) { ColumnStatistics[] colStats = stats.get(firstStripeIndex).getColumnStatistics(); @@ -520,15 +528,40 @@ public class VectorizedOrcAcidRowBatchReader //we may want to change bucketProperty from int to long in the // future(across the stack) this protects the following cast to int assert bucketProperty.getMinimum() <= Integer.MAX_VALUE : - "was bucketProper changed to a long (" + + "was bucketProperty changed to a long (" + bucketProperty.getMinimum() + ")?!:" + orcSplit; //this a lower bound but not necessarily greatest lower bound minKey = new RecordIdentifier(origWriteId.getMinimum(), (int) bucketProperty.getMinimum(), rowId.getMinimum()); } } + + RecordIdentifier maxKey = null; + + if (keyIndex != null) { + maxKey = keyIndex[lastStripeIndex]; + } else { + if(columnStatsPresent) { + ColumnStatistics[] colStats = + stats.get(lastStripeIndex).getColumnStatistics(); + IntegerColumnStatistics origWriteId = (IntegerColumnStatistics) + colStats[OrcRecordUpdater.ORIGINAL_WRITEID + 1]; + IntegerColumnStatistics bucketProperty = (IntegerColumnStatistics) + colStats[OrcRecordUpdater.BUCKET + 1]; + IntegerColumnStatistics rowId = (IntegerColumnStatistics) + colStats[OrcRecordUpdater.ROW_ID + 1]; + + assert bucketProperty.getMaximum() <= Integer.MAX_VALUE : + "was bucketProperty changed to a long (" + + bucketProperty.getMaximum() + ")?!:" + orcSplit; + + // this is an upper bound but not necessarily the least upper bound + maxKey = new RecordIdentifier(origWriteId.getMaximum(), + (int) bucketProperty.getMaximum(), rowId.getMaximum()); + } + } OrcRawRecordMerger.KeyInterval keyInterval = - new OrcRawRecordMerger.KeyInterval(minKey, keyIndex[lastStripeIndex]); + new OrcRawRecordMerger.KeyInterval(minKey, maxKey); LOG.info("findMinMaxKeys(): " + keyInterval + " stripes(" + firstStripeIndex + "," + lastStripeIndex + ")"); @@ -551,7 +584,6 @@ public class VectorizedOrcAcidRowBatchReader * So use stripe stats to find proper min/max for bucketProp and rowId * writeId is the same in both cases */ - List<StripeStatistics> stats = reader.getStripeStatistics(); for(int i = firstStripeIndex; i <= lastStripeIndex; i++) { ColumnStatistics[] colStats = stats.get(firstStripeIndex) .getColumnStatistics(); http://git-wip-us.apache.org/repos/asf/hive/blob/6fbdf374/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index cc29384..bc03620 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -434,6 +434,8 @@ public class TestOrcRawRecordMerger { Mockito.when(recordReader.next(row2)).thenReturn(row3); Mockito.when(recordReader.next(row3)).thenReturn(row5); + Mockito.when(reader.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)) + .thenReturn(true); Mockito.when(reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)) .thenReturn(ByteBuffer.wrap("10,20,30;40,50,60;40,50,61" .getBytes("UTF-8"))); http://git-wip-us.apache.org/repos/asf/hive/blob/6fbdf374/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index 0b26879..99317cc 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -363,10 +363,18 @@ public class TestVectorizedOrcAcidRowBatchReader { HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, true); testDeleteEventFiltering2(); } + @Test + public void testDeleteEventFilteringOnWithoutIdx2() throws Exception { + HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, true); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVETESTMODEACIDKEYIDXSKIP, true); + testDeleteEventFiltering2(); + } private void testDeleteEventFiltering2() throws Exception { boolean filterOn = HiveConf.getBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS); + boolean skipKeyIdx = + HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVETESTMODEACIDKEYIDXSKIP); int bucket = 1; AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .filesystem(fs) @@ -430,10 +438,18 @@ public class TestVectorizedOrcAcidRowBatchReader { vectorizedReader.getKeyInterval(); SearchArgument sarg = vectorizedReader.getDeleteEventSarg(); if(filterOn) { - assertEquals(new OrcRawRecordMerger.KeyInterval( - new RecordIdentifier(0, bucketProperty, 0), - new RecordIdentifier(10000001, bucketProperty, 0)), - keyInterval); + if (skipKeyIdx) { + // If key index is not present, the min max key interval uses stripe stats instead + assertEquals(new OrcRawRecordMerger.KeyInterval( + new RecordIdentifier(0, bucketProperty, 0), + new RecordIdentifier(10000001, bucketProperty, 2)), + keyInterval); + } else { + assertEquals(new OrcRawRecordMerger.KeyInterval( + new RecordIdentifier(0, bucketProperty, 0), + new RecordIdentifier(10000001, bucketProperty, 0)), + keyInterval); + } //key point is that in leaf-5 is (rowId <= 2) even though maxKey has //rowId 0. more in VectorizedOrcAcidRowBatchReader.findMinMaxKeys assertEquals( "leaf-0 = (LESS_THAN originalTransaction 0)," +