HBASE-19163 "Maximum lock count exceeded" from region server's batch processing
Signed-off-by: Umesh Agashe <uaga...@cloudera.com> Signed-off-by: Michael Stack <st...@apache.org> Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b3f911c1 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b3f911c1 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b3f911c1 Branch: refs/heads/branch-1.3 Commit: b3f911c1c6381c801d88b1d0fe8f4620860aada0 Parents: e4f46f5 Author: huaxiangsun <huaxiang...@apache.org> Authored: Fri Jan 19 11:22:00 2018 -0800 Committer: Andrew Purtell <apurt...@apache.org> Committed: Wed Dec 12 18:08:17 2018 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/HRegion.java | 43 ++++++++++++++++---- .../hbase/client/TestFromClientSide3.java | 27 ++++++++++++ .../hbase/regionserver/TestAtomicOperation.java | 6 +-- 3 files changed, 66 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/b3f911c1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 499cfe4..c2ccf83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -112,6 +112,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; @@ -3098,6 +3099,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // We try to set up a batch in the range [firstIndex,lastIndexExclusive) int firstIndex = batchOp.nextIndexToProcess; int lastIndexExclusive = firstIndex; + RowLock prevRowLock = null; boolean success = false; int noOfPuts = 0, noOfDeletes = 0; WALKey walKey = null; @@ -3160,7 +3162,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean shouldBlock = numReadyToWrite == 0; RowLock rowLock = null; try { - rowLock = getRowLockInternal(mutation.getRow(), true, shouldBlock); + rowLock = getRowLockInternal(mutation.getRow(), true, shouldBlock, prevRowLock); + } catch (TimeoutIOException e) { + // We will retry when other exceptions, but we should stop if we timeout . + throw e; } catch (IOException ioe) { LOG.warn("Failed getting lock in batch put, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); @@ -3171,7 +3176,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi break; } else { - acquiredRowLocks.add(rowLock); + if (rowLock != prevRowLock) { + // It is a different row now, add this to the acquiredRowLocks and + // set prevRowLock to the new returned rowLock + acquiredRowLocks.add(rowLock); + prevRowLock = rowLock; + } } lastIndexExclusive++; @@ -3265,7 +3275,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi checkAndPrepareMutation(cpMutation, isInReplay, cpFamilyMap, now); // Acquire row locks. If not, the whole batch will fail. - acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true, true)); + acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true, true, null)); if (cpMutation.getDurability() == Durability.SKIP_WAL) { recordMutationWithoutWal(cpFamilyMap); @@ -5360,17 +5370,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public RowLock getRowLock(byte[] row, boolean readLock, boolean waitForLock) throws IOException { // Make sure the row is inside of this region before getting the lock for it. checkRow(row, "row lock"); - return getRowLockInternal(row, readLock, waitForLock); + return getRowLockInternal(row, readLock, waitForLock, null); } // getRowLock calls checkRow. Call this to skip checkRow. protected RowLock getRowLockInternal(byte[] row) throws IOException { - return getRowLockInternal(row, false, true); + return getRowLockInternal(row, false, true, null); } - protected RowLock getRowLockInternal(byte[] row, boolean readLock, boolean waitForLock) - throws IOException { + protected RowLock getRowLockInternal(byte[] row, boolean readLock, boolean waitForLock, + final RowLock prevRowLock) throws IOException { // create an object to use a a key in the row lock map HashedBytes rowKey = new HashedBytes(row); @@ -5404,6 +5414,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // // This can fail as if (readLock) { + // For read lock, if the caller has locked the same row previously, it will not try + // to acquire the same read lock. It simply returns the previous row lock. + RowLockImpl prevRowLockImpl = (RowLockImpl)prevRowLock; + if ((prevRowLockImpl != null) && (prevRowLockImpl.getLock() == + rowLockContext.readWriteLock.readLock())) { + success = true; + return prevRowLock; + } result = rowLockContext.newReadLock(); } else { result = rowLockContext.newWriteLock(); @@ -5441,6 +5459,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } Thread.currentThread().interrupt(); throw iie; + } catch (Error error) { + // The maximum lock count for read lock is 64K (hardcoded), when this maximum count + // is reached, it will throw out an Error. This Error needs to be caught so it can + // go ahead to process the minibatch with lock acquired. + LOG.warn("Error to get row lock for " + Bytes.toStringBinary(row) + ", cause: " + error); + IOException ioe = new IOException(); + ioe.initCause(error); + if (traceScope != null) { + traceScope.getSpan().addTimelineAnnotation("Error getting row lock"); + } + throw ioe; } finally { // Clean up the counts just in case this was the thing keeping the context alive. if (!success && rowLockContext != null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/b3f911c1/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index 5baab39..a372644 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -413,6 +413,33 @@ public class TestFromClientSide3 { } } + // Test Table.batch with large amount of mutations against the same key. + // It used to trigger read lock's "Maximum lock count exceeded" Error. + @Test + public void testHTableWithLargeBatch() throws Exception { + Table table = TEST_UTIL.createTable(TableName.valueOf("testHTableWithLargeBatch"), + new byte[][] { FAMILY }); + int sixtyFourK = 64 * 1024; + try { + List actions = new ArrayList(); + Object[] results = new Object[(sixtyFourK + 1) * 2]; + + for (int i = 0; i < sixtyFourK + 1; i ++) { + Put put1 = new Put(ROW); + put1.addColumn(FAMILY, QUALIFIER, VALUE); + actions.add(put1); + + Put put2 = new Put(ANOTHERROW); + put2.addColumn(FAMILY, QUALIFIER, VALUE); + actions.add(put2); + } + + table.batch(actions, results); + } finally { + table.close(); + } + } + @Test public void testHTableExistsMethodSingleRegionSingleGet() throws Exception { http://git-wip-us.apache.org/repos/asf/hbase/blob/b3f911c1/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index d9415df..b9f2290 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -663,12 +663,12 @@ public class TestAtomicOperation { } @Override - public RowLock getRowLockInternal(final byte[] row, boolean readLock, boolean waitForLock) - throws IOException { + public RowLock getRowLockInternal(final byte[] row, boolean readLock, boolean waitForLock, + final RowLock prevRowLock) throws IOException { if (testStep == TestStep.CHECKANDPUT_STARTED) { latch.countDown(); } - return new WrappedRowLock(super.getRowLockInternal(row, readLock, waitForLock)); + return new WrappedRowLock(super.getRowLockInternal(row, readLock, waitForLock, prevRowLock)); } public class WrappedRowLock implements RowLock {