HIVE-10735: Cached plan race condition - VectorMapJoinCommonOperator has no closeOp() (Matt McCline, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/82beb2ba Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/82beb2ba Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/82beb2ba Branch: refs/heads/hbase-metastore Commit: 82beb2ba3d0b2fa7ef690c4a51d641ced161fce2 Parents: 949ddaa Author: Gunther Hagleitner <gunt...@apache.org> Authored: Wed Jun 3 21:34:33 2015 -0700 Committer: Gunther Hagleitner <gunt...@apache.org> Committed: Wed Jun 3 21:34:33 2015 -0700 ---------------------------------------------------------------------- .../persistence/BytesBytesMultiHashMap.java | 48 +++++++------------- .../persistence/MapJoinBytesTableContainer.java | 4 +- .../ql/exec/vector/VectorFilterOperator.java | 5 +- 3 files changed, 21 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/82beb2ba/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java index 2ba622e..3bba890 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java @@ -184,19 +184,6 @@ public final class BytesBytesMultiHashMap { this(initialCapacity, loadFactor, wbSize, -1); } - public class ThreadSafeGetter { - private WriteBuffers.Position position = new WriteBuffers.Position(); - public byte getValueResult(byte[] key, int offset, int length, - BytesBytesMultiHashMap.Result hashMapResult) { - return BytesBytesMultiHashMap.this.getValueResult(key, offset, length, hashMapResult, position); - } - - public void populateValue(WriteBuffers.ByteSegmentRef valueRef) { - // Convenience method, populateValue is thread-safe. - BytesBytesMultiHashMap.this.populateValue(valueRef); - } - } - /** * The result of looking up a key in the multi-hash map. * @@ -232,6 +219,14 @@ public final class BytesBytesMultiHashMap { public Result() { hasRows = false; byteSegmentRef = new WriteBuffers.ByteSegmentRef(); + readPos = new WriteBuffers.Position(); + } + + /** + * Return the thread-safe read position. + */ + public WriteBuffers.Position getReadPos() { + return readPos; } /** @@ -260,14 +255,11 @@ public final class BytesBytesMultiHashMap { * Whether there are multiple values (true) or just a single value (false). * @param offsetAfterListRecordKeyLen * The offset of just after the key length in the list record. Or, 0 when single row. - * @param readPos - * Holds mutable read position for thread safety. */ public void set(BytesBytesMultiHashMap hashMap, long firstOffset, boolean hasList, - long offsetAfterListRecordKeyLen, WriteBuffers.Position readPos) { + long offsetAfterListRecordKeyLen) { this.hashMap = hashMap; - this.readPos = readPos; this.firstOffset = firstOffset; this.hasList = hasList; @@ -410,7 +402,6 @@ public final class BytesBytesMultiHashMap { */ public void forget() { hashMap = null; - readPos = null; byteSegmentRef.reset(0, 0); hasRows = false; readIndex = 0; @@ -481,29 +472,23 @@ public final class BytesBytesMultiHashMap { ++numValues; } - public ThreadSafeGetter createGetterForThread() { - return new ThreadSafeGetter(); - } - - /** Not thread-safe! Use createGetterForThread. */ - public byte getValueResult(byte[] key, int offset, int length, Result hashMapResult) { - return getValueResult(key, offset, length, hashMapResult, writeBuffers.getReadPosition()); - } - /** * Finds a key. Values can be read with the supplied result object. * + * Important Note: The caller is expected to pre-allocate the hashMapResult and not + * share it among other threads. + * * @param key Key buffer. * @param offset the offset to the key in the buffer * @param hashMapResult The object to fill in that can read the values. - * @param readPos Holds mutable read position for thread safety. * @return The state byte. */ - private byte getValueResult(byte[] key, int offset, int length, Result hashMapResult, - WriteBuffers.Position readPos) { + public byte getValueResult(byte[] key, int offset, int length, Result hashMapResult) { hashMapResult.forget(); + WriteBuffers.Position readPos = hashMapResult.getReadPos(); + // First, find first record for the key. long ref = findKeyRefToRead(key, offset, length, readPos); if (ref == 0) { @@ -515,8 +500,7 @@ public final class BytesBytesMultiHashMap { // This relies on findKeyRefToRead doing key equality check and leaving read ptr where needed. long offsetAfterListRecordKeyLen = hasList ? writeBuffers.getReadPoint(readPos) : 0; - hashMapResult.set(this, Ref.getOffset(ref), hasList, offsetAfterListRecordKeyLen, - readPos); + hashMapResult.set(this, Ref.getOffset(ref), hasList, offsetAfterListRecordKeyLen); return Ref.getStateByte(ref); } http://git-wip-us.apache.org/repos/asf/hive/blob/82beb2ba/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java index 6deaafc..34b3aa9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java @@ -500,7 +500,6 @@ public class MapJoinBytesTableContainer private byte aliasFilter; /** Hash table wrapper specific to the container. */ - private final BytesBytesMultiHashMap.ThreadSafeGetter threadSafeHashMapGetter; private BytesBytesMultiHashMap.Result hashMapResult; /** @@ -520,14 +519,13 @@ public class MapJoinBytesTableContainer valueStruct = null; // No rows? } uselessIndirection = new ByteArrayRef(); - threadSafeHashMapGetter = hashMap.createGetterForThread(); hashMapResult = new BytesBytesMultiHashMap.Result(); clearRows(); } public JoinUtil.JoinResult setFromOutput(Output output) { - aliasFilter = threadSafeHashMapGetter.getValueResult( + aliasFilter = hashMap.getValueResult( output.getData(), 0, output.getLength(), hashMapResult); dummyRow = null; if (hashMapResult.hasRows()) { http://git-wip-us.apache.org/repos/asf/hive/blob/82beb2ba/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java index d1b8939..b40564c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java @@ -41,7 +41,7 @@ public class VectorFilterOperator extends FilterOperator { private VectorExpression conditionEvaluator = null; // Temporary selected vector - private final int[] temporarySelected = new int [VectorizedRowBatch.DEFAULT_SIZE]; + private transient int[] temporarySelected; // filterMode is 1 if condition is always true, -1 if always false // and 0 if condition needs to be computed. @@ -77,6 +77,9 @@ public class VectorFilterOperator extends FilterOperator { filterMode = -1; } } + + temporarySelected = new int [VectorizedRowBatch.DEFAULT_SIZE]; + return result; }