Repository: hive Updated Branches: refs/heads/master f122e258b -> 39ed52c48
HIVE-20635: VectorizedOrcAcidRowBatchReader doesn't filter delete events for original files (Saurabh Seth via Eugene Koifman) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/39ed52c4 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/39ed52c4 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/39ed52c4 Branch: refs/heads/master Commit: 39ed52c48d6970e4ae83d423fe6cf5ced914a69c Parents: f122e25 Author: Saurabh Seth <saurabh.s...@gmail.com> Authored: Sat Oct 6 14:10:48 2018 -0700 Committer: Eugene Koifman <ekoif...@apache.org> Committed: Sat Oct 6 14:10:48 2018 -0700 ---------------------------------------------------------------------- .../io/orc/VectorizedOrcAcidRowBatchReader.java | 83 ++++++++- .../TestVectorizedOrcAcidRowBatchReader.java | 177 +++++++++++++++++++ 2 files changed, 252 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/39ed52c4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 0cefeee..66280b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -228,6 +228,8 @@ public class VectorizedOrcAcidRowBatchReader LOG.info("Read ValidWriteIdList: " + this.validWriteIdList.toString() + ":" + orcSplit); + this.syntheticProps = orcSplit.getSyntheticAcidProps(); + // Clone readerOptions for deleteEvents. Reader.Options deleteEventReaderOptions = readerOptions.clone(); // Set the range on the deleteEventReaderOptions to 0 to INTEGER_MAX because @@ -257,7 +259,6 @@ public class VectorizedOrcAcidRowBatchReader } rowIdProjected = areRowIdsProjected(rbCtx); rootPath = orcSplit.getRootDir(); - syntheticProps = orcSplit.getSyntheticAcidProps(); /** * This could be optimized by moving dir type/write id based checks are @@ -393,6 +394,13 @@ public class VectorizedOrcAcidRowBatchReader LOG.debug("findMinMaxKeys() " + ConfVars.FILTER_DELETE_EVENTS + "=false"); return new OrcRawRecordMerger.KeyInterval(null, null); } + + //todo: since we already have OrcSplit.orcTail, should somehow use it to + // get the acid.index, stats, etc rather than fetching the footer again + // though it seems that orcTail is mostly null.... + Reader reader = OrcFile.createReader(orcSplit.getPath(), + OrcFile.readerOptions(conf)); + if(orcSplit.isOriginal()) { /** * Among originals we may have files with _copy_N suffix. To properly @@ -403,14 +411,11 @@ public class VectorizedOrcAcidRowBatchReader * Kind of chicken-and-egg - deal with this later. * See {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int, * Reader.Options, Configuration, OrcRawRecordMerger.Options)}*/ - LOG.debug("findMinMaxKeys(original split) - ignoring"); - return new OrcRawRecordMerger.KeyInterval(null, null); + LOG.debug("findMinMaxKeys(original split)"); + + return findOriginalMinMaxKeys(orcSplit, reader, deleteEventReaderOptions); } - //todo: since we already have OrcSplit.orcTail, should somehow use it to - // get the acid.index, stats, etc rather than fetching the footer again - // though it seems that orcTail is mostly null.... - Reader reader = OrcFile.createReader(orcSplit.getPath(), - OrcFile.readerOptions(conf)); + List<StripeInformation> stripes = reader.getStripes(); final long splitStart = orcSplit.getStart(); final long splitEnd = splitStart + orcSplit.getLength(); @@ -578,6 +583,68 @@ public class VectorizedOrcAcidRowBatchReader return keyInterval; } + private OrcRawRecordMerger.KeyInterval findOriginalMinMaxKeys(OrcSplit orcSplit, Reader reader, + Reader.Options deleteEventReaderOptions) { + + // This method returns the minimum and maximum synthetic row ids that are present in this split + // because min and max keys are both inclusive when filtering out the delete delta records. + + if (syntheticProps == null) { + // syntheticProps containing the synthetic rowid offset is computed if there are delete delta files. + // If there aren't any delete delta files, then we don't need this anyway. + return new OrcRawRecordMerger.KeyInterval(null, null); + } + + long splitStart = orcSplit.getStart(); + long splitEnd = orcSplit.getStart() + orcSplit.getLength(); + + long minRowId = syntheticProps.getRowIdOffset(); + long maxRowId = syntheticProps.getRowIdOffset(); + + for(StripeInformation stripe: reader.getStripes()) { + if (splitStart > stripe.getOffset()) { + // This stripe starts before the current split starts. This stripe is not included in this split. + minRowId += stripe.getNumberOfRows(); + } + + if (splitEnd > stripe.getOffset()) { + // This stripe starts before the current split ends. + maxRowId += stripe.getNumberOfRows(); + } else { + // The split ends before (or exactly where) this stripe starts. + // Remaining stripes are not included in this split. + break; + } + } + + RecordIdentifier minKey = new RecordIdentifier(syntheticProps.getSyntheticWriteId(), + syntheticProps.getBucketProperty(), minRowId); + + RecordIdentifier maxKey = new RecordIdentifier(syntheticProps.getSyntheticWriteId(), + syntheticProps.getBucketProperty(), maxRowId > 0? maxRowId - 1: 0); + + OrcRawRecordMerger.KeyInterval keyIntervalTmp = new OrcRawRecordMerger.KeyInterval(minKey, maxKey); + + if (minRowId >= maxRowId) { + /** + * The split lies entirely within a single stripe. In this case, the reader for this split will not read any data. + * See {@link org.apache.orc.impl.RecordReaderImpl#RecordReaderImpl + * We can return the min max key interval as is (it will not read any of the delete delta records into mem) + */ + + LOG.info("findOriginalMinMaxKeys(): This split starts and ends in the same stripe."); + } + + LOG.info("findOriginalMinMaxKeys(): " + keyIntervalTmp); + + // Using min/max ROW__ID from original will work for ppd to the delete deltas because the writeid is the same in + // the min and the max ROW__ID + setSARG(keyIntervalTmp, deleteEventReaderOptions, minKey.getBucketProperty(), maxKey.getBucketProperty(), + minKey.getRowId(), maxKey.getRowId()); + + return keyIntervalTmp; + } + /** * See {@link #next(NullWritable, VectorizedRowBatch)} first and * {@link OrcRawRecordMerger.OriginalReaderPair}. http://git-wip-us.apache.org/repos/asf/hive/blob/39ed52c4/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index 8f477f4..0a499b1 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -19,12 +19,14 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.File; import java.util.List; +import java.util.Properties; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; @@ -38,6 +40,7 @@ import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.Columniz import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.SortMergedDeleteEventRegistry; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.io.LongWritable; @@ -63,6 +66,7 @@ public class TestVectorizedOrcAcidRowBatchReader { private FileSystem fs; private Path root; private ObjectInspector inspector; + private ObjectInspector originalInspector; public static class DummyRow { LongWritable field; @@ -88,6 +92,24 @@ public class TestVectorizedOrcAcidRowBatchReader { } + /** + * Dummy row for original files. + */ + public static class DummyOriginalRow { + LongWritable field; + + DummyOriginalRow(long val) { + field = new LongWritable(val); + } + + static String getColumnNamesProperty() { + return "field"; + } + static String getColumnTypesProperty() { + return "bigint"; + } + } + @Before public void setup() throws Exception { conf = new JobConf(); @@ -110,6 +132,9 @@ public class TestVectorizedOrcAcidRowBatchReader { synchronized (TestOrcFile.class) { inspector = ObjectInspectorFactory.getReflectionObjectInspector (DummyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + + originalInspector = ObjectInspectorFactory.getReflectionObjectInspector(DummyOriginalRow.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } } @Test @@ -370,6 +395,158 @@ public class TestVectorizedOrcAcidRowBatchReader { } } + + @Test + public void testDeleteEventOriginalFilteringOn() throws Exception { + HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, true); + testDeleteEventOriginalFiltering(); + } + + @Test + public void testDeleteEventOriginalFilteringOff() throws Exception { + HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, false); + testDeleteEventOriginalFiltering(); + } + + public void testDeleteEventOriginalFiltering() throws Exception { + boolean filterOn = + HiveConf.getBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS); + + conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false); + + // Create 3 original files with 3 rows each + Properties properties = new Properties(); + properties.setProperty("columns", DummyOriginalRow.getColumnNamesProperty()); + properties.setProperty("columns.types", DummyOriginalRow.getColumnTypesProperty()); + + OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(properties, conf); + writerOptions.inspector(originalInspector); + + Path testFilePath = new Path(root, "000000_0"); + Writer writer = OrcFile.createWriter(testFilePath, writerOptions); + + writer.addRow(new DummyOriginalRow(0)); + writer.addRow(new DummyOriginalRow(0)); + writer.addRow(new DummyOriginalRow(0)); + writer.close(); + + testFilePath = new Path(root, "000000_0_copy_1"); + + writer = OrcFile.createWriter(testFilePath, writerOptions); + writer.addRow(new DummyOriginalRow(0)); + writer.addRow(new DummyOriginalRow(0)); + writer.addRow(new DummyOriginalRow(0)); + writer.close(); + + testFilePath = new Path(root, "000000_0_copy_2"); + + writer = OrcFile.createWriter(testFilePath, writerOptions); + writer.addRow(new DummyOriginalRow(0)); + writer.addRow(new DummyOriginalRow(0)); + writer.addRow(new DummyOriginalRow(0)); + writer.close(); + + conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); + + int bucket = 0; + + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) + .filesystem(fs) + .bucket(bucket) + .writingBase(false) + .minimumWriteId(1) + .maximumWriteId(1) + .inspector(inspector) + .reporter(Reporter.NULL) + .recordIdColumn(1) + .finalDestination(root); + + int bucketProperty = BucketCodec.V1.encode(options); + + RecordUpdater updater = new OrcRecordUpdater(root, options); + + //delete 1 row from each of the original files + // Delete the last record in this split to test boundary conditions. It should not be present in the delete event + // registry for the next split + updater.delete(options.getMinimumWriteId(), new DummyRow(-1, 2, 0, bucket)); + // Delete the first record in this split to test boundary conditions. It should not be present in the delete event + // registry for the previous split + updater.delete(options.getMinimumWriteId(), new DummyRow(-1, 3, 0, bucket)); + updater.delete(options.getMinimumWriteId(), new DummyRow(-1, 7, 0, bucket)); + updater.close(false); + + //HWM is not important - just make sure deltas created above are read as if committed + conf.set(ValidWriteIdList.VALID_WRITEIDS_KEY, "tbl:2:" + Long.MAX_VALUE + "::"); + + // Set vector mode to true int the map work so that we recognize this as a vector mode execution during the split + // generation. Without this we will not compute the offset for the synthetic row ids. + MapWork mapWork = new MapWork(); + mapWork.setVectorMode(true); + VectorizedRowBatchCtx vrbContext = new VectorizedRowBatchCtx(); + mapWork.setVectorizedRowBatchCtx(vrbContext); + HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp"); + Utilities.setMapWork(conf, mapWork); + + // now we have 3 delete events total, but for each split we should only + // load 1 into DeleteRegistry (if filtering is on) + List<OrcInputFormat.SplitStrategy<?>> splitStrategies = getSplitStrategies(); + assertEquals(1, splitStrategies.size()); + List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits(); + + assertEquals(3, splits.size()); + assertEquals(root.toUri().toString() + File.separator + "000000_0", + splits.get(0).getPath().toUri().toString()); + assertTrue(splits.get(0).isOriginal()); + + assertEquals(root.toUri().toString() + File.separator + "000000_0_copy_1", + splits.get(1).getPath().toUri().toString()); + assertTrue(splits.get(1).isOriginal()); + + assertEquals(root.toUri().toString() + File.separator + "000000_0_copy_2", + splits.get(2).getPath().toUri().toString()); + assertTrue(splits.get(2).isOriginal()); + + VectorizedOrcAcidRowBatchReader vectorizedReader = + new VectorizedOrcAcidRowBatchReader(splits.get(0), conf, Reporter.NULL, vrbContext); + ColumnizedDeleteEventRegistry deleteEventRegistry = + (ColumnizedDeleteEventRegistry) vectorizedReader.getDeleteEventRegistry(); + assertEquals("number of delete events for stripe 1", filterOn ? 1 : 3, deleteEventRegistry.size()); + OrcRawRecordMerger.KeyInterval keyInterval = vectorizedReader.getKeyInterval(); + if(filterOn) { + assertEquals(new OrcRawRecordMerger.KeyInterval( + new RecordIdentifier(0, bucketProperty, 0), + new RecordIdentifier(0, bucketProperty, 2)), + keyInterval); + } else { + assertEquals(new OrcRawRecordMerger.KeyInterval(null, null), keyInterval); + } + + vectorizedReader = new VectorizedOrcAcidRowBatchReader(splits.get(1), conf, Reporter.NULL, vrbContext); + deleteEventRegistry = (ColumnizedDeleteEventRegistry) vectorizedReader.getDeleteEventRegistry(); + assertEquals("number of delete events for stripe 2", filterOn ? 1 : 3, deleteEventRegistry.size()); + keyInterval = vectorizedReader.getKeyInterval(); + if(filterOn) { + assertEquals(new OrcRawRecordMerger.KeyInterval( + new RecordIdentifier(0, bucketProperty, 3), + new RecordIdentifier(0, bucketProperty, 5)), + keyInterval); + } else { + assertEquals(new OrcRawRecordMerger.KeyInterval(null, null), keyInterval); + } + + vectorizedReader = new VectorizedOrcAcidRowBatchReader(splits.get(2), conf, Reporter.NULL, vrbContext); + deleteEventRegistry = (ColumnizedDeleteEventRegistry) vectorizedReader.getDeleteEventRegistry(); + assertEquals("number of delete events for stripe 3", filterOn ? 1 : 3, deleteEventRegistry.size()); + keyInterval = vectorizedReader.getKeyInterval(); + if(filterOn) { + assertEquals(new OrcRawRecordMerger.KeyInterval( + new RecordIdentifier(0, bucketProperty, 6), + new RecordIdentifier(0, bucketProperty, 8)), keyInterval); + } else { + assertEquals(new OrcRawRecordMerger.KeyInterval(null, null), keyInterval); + } + } + @Test public void testVectorizedOrcAcidRowBatchReader() throws Exception { conf.set("bucket_count", "1");