This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new ce4974f  HBASE-26210 HBase Write should be doomed to hang when cell 
size exceeds InmemoryFlushSize for CompactingMemStore (#3604)
ce4974f is described below

commit ce4974f313296e8515701af2c738779376f2d5bb
Author: chenglei <cheng...@apache.org>
AuthorDate: Wed Sep 1 15:50:51 2021 +0800

    HBASE-26210 HBase Write should be doomed to hang when cell size exceeds 
InmemoryFlushSize for CompactingMemStore (#3604)
    
    Signed-off-by: Duo Zhang <zhang...@apache.org>
---
 .../hbase/regionserver/CompactingMemStore.java     |  93 +++----
 .../hbase/regionserver/CompactionPipeline.java     |   2 +-
 .../TestCompactingToCellFlatMapMemStore.java       |   6 +-
 .../hadoop/hbase/regionserver/TestHStore.java      | 267 +++++++++++++++++++--
 4 files changed, 303 insertions(+), 65 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index a8fee3e..e20c924 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -211,7 +211,7 @@ public class CompactingMemStore extends AbstractMemStore {
       stopCompaction();
       // region level lock ensures pushing active to pipeline is done in 
isolation
       // no concurrent update operations trying to flush the active segment
-      pushActiveToPipeline(getActive());
+      pushActiveToPipeline(getActive(), true);
       resetTimeOfOldestEdit();
       snapshotId = EnvironmentEdgeManager.currentTime();
       // in both cases whatever is pushed to snapshot is cleared from the 
pipeline
@@ -419,33 +419,61 @@ public class CompactingMemStore extends AbstractMemStore {
   }
 
   /**
-   * Check whether anything need to be done based on the current active set 
size.
-   * The method is invoked upon every addition to the active set.
-   * For CompactingMemStore, flush the active set to the read-only memory if 
it's
-   * size is above threshold
+   * Check whether anything need to be done based on the current active set 
size. The method is
+   * invoked upon every addition to the active set. For CompactingMemStore, 
flush the active set to
+   * the read-only memory if it's size is above threshold
    * @param currActive intended segment to update
    * @param cellToAdd cell to be added to the segment
    * @param memstoreSizing object to accumulate changed size
-   * @return true if the cell can be added to the
+   * @return true if the cell can be added to the currActive
    */
-  private boolean checkAndAddToActiveSize(MutableSegment currActive, Cell 
cellToAdd,
+  protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell 
cellToAdd,
       MemStoreSizing memstoreSizing) {
-    if (shouldFlushInMemory(currActive, cellToAdd, memstoreSizing)) {
-      if (currActive.setInMemoryFlushed()) {
-        flushInMemory(currActive);
-        if (setInMemoryCompactionFlag()) {
-          // The thread is dispatched to do in-memory compaction in the 
background
-          InMemoryCompactionRunnable runnable = new 
InMemoryCompactionRunnable();
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Dispatching the MemStore in-memory flush for store " + 
store
-                .getColumnFamilyName());
-          }
-          getPool().execute(runnable);
+    long cellSize = MutableSegment.getCellLength(cellToAdd);
+    boolean successAdd = false;
+    while (true) {
+      long segmentDataSize = currActive.getDataSize();
+      if (!inWalReplay && segmentDataSize > inmemoryFlushSize) {
+        // when replaying edits from WAL there is no need in in-memory flush 
regardless the size
+        // otherwise size below flush threshold try to update atomically
+        break;
+      }
+      if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + 
cellSize)) {
+        if (memstoreSizing != null) {
+          memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0);
         }
+        successAdd = true;
+        break;
+      }
+    }
+
+    if (!inWalReplay && currActive.getDataSize() > inmemoryFlushSize) {
+      // size above flush threshold so we flush in memory
+      this.tryFlushInMemoryAndCompactingAsync(currActive);
+    }
+    return successAdd;
+  }
+
+  /**
+   * Try to flush the currActive in memory and submit the background
+   * {@link InMemoryCompactionRunnable} to
+   * {@link RegionServicesForStores#getInMemoryCompactionPool()}. Just one 
thread can do the actual
+   * flushing in memory.
+   * @param currActive current Active Segment to be flush in memory.
+   */
+  private void tryFlushInMemoryAndCompactingAsync(MutableSegment currActive) {
+    if (currActive.setInMemoryFlushed()) {
+      flushInMemory(currActive);
+      if (setInMemoryCompactionFlag()) {
+        // The thread is dispatched to do in-memory compaction in the 
background
+        InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable();
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(
+            "Dispatching the MemStore in-memory flush for store " + 
store.getColumnFamilyName());
+        }
+        getPool().execute(runnable);
       }
-      return false;
     }
-    return true;
   }
 
   // externally visible only for tests
@@ -504,27 +532,6 @@ public class CompactingMemStore extends AbstractMemStore {
     return getRegionServices().getInMemoryCompactionPool();
   }
 
-  @VisibleForTesting
-  protected boolean shouldFlushInMemory(MutableSegment currActive, Cell 
cellToAdd,
-      MemStoreSizing memstoreSizing) {
-    long cellSize = MutableSegment.getCellLength(cellToAdd);
-    long segmentDataSize = currActive.getDataSize();
-    while (segmentDataSize + cellSize < inmemoryFlushSize || inWalReplay) {
-      // when replaying edits from WAL there is no need in in-memory flush 
regardless the size
-      // otherwise size below flush threshold try to update atomically
-      if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + 
cellSize)) {
-        if (memstoreSizing != null) {
-          memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0);
-        }
-        // enough space for cell - no need to flush
-        return false;
-      }
-      segmentDataSize = currActive.getDataSize();
-    }
-    // size above flush threshold
-    return true;
-  }
-
   /**
    * The request to cancel the compaction asynchronous task (caused by 
in-memory flush)
    * The compaction may still happen if the request was sent too late
@@ -536,10 +543,6 @@ public class CompactingMemStore extends AbstractMemStore {
     }
   }
 
-  protected void pushActiveToPipeline(MutableSegment currActive) {
-    pushActiveToPipeline(currActive, true);
-  }
-
   /**
    * NOTE: When {@link CompactingMemStore#flushInMemory(MutableSegment)} calls 
this method, due to
    * concurrent writes and because we first add cell size to 
currActive.getDataSize and then
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 194e065..62d1f59 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -229,7 +229,7 @@ public class CompactionPipeline {
         if ( s.canBeFlattened() ) {
           s.waitForUpdates(); // to ensure all updates preceding s in-memory 
flush have completed
           if (s.isEmpty()) {
-            // after s.waitForUpdates() is called, there is no updates 
preceding,if no cells in s,
+            // after s.waitForUpdates() is called, there is no updates 
pending,if no cells in s,
             // we can skip it.
             continue;
           }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
index 8291172..257cdf1 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
@@ -823,11 +823,11 @@ public class TestCompactingToCellFlatMapMemStore extends 
TestCompactingMemStore
 
     // The in-memory flush size is bigger than the size of a single cell,
     // but smaller than the size of two cells.
-    // Therefore, the two created cells are flattened together.
+    // Therefore, the two created cells are flushed together as a single 
CSLMImmutableSegment and
+    // flattened.
     totalHeapSize = MutableSegment.DEEP_OVERHEAD
         + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
-        + 1 * oneCellOnCSLMHeapSize
-        + 1 * oneCellOnCCMHeapSize;
+        + 2 * oneCellOnCCMHeapSize;
     assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 1c73d1c..b2582ad 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -50,6 +50,10 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.IntBinaryOperator;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -1733,8 +1737,11 @@ public class TestHStore {
     assertArrayEquals(table, hFileContext.getTableName());
   }
 
+  // This test is for HBASE-26026, HBase Write be stuck when active segment 
has no cell
+  // but its dataSize exceeds inmemoryFlushSize
   @Test
-  public void testCompactingMemStoreStuckBug26026() throws IOException, 
InterruptedException {
+  public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize()
+      throws IOException, InterruptedException {
     Configuration conf = HBaseConfiguration.create();
 
     byte[] smallValue = new byte[3];
@@ -1758,12 +1765,15 @@ public class TestHStore {
     MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) 
store.memstore);
     assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == 
flushByteSize);
     myCompactingMemStore.smallCellPreUpdateCounter.set(0);
-    myCompactingMemStore.smallCellPostUpdateCounter.set(0);
     myCompactingMemStore.largeCellPreUpdateCounter.set(0);
-    myCompactingMemStore.largeCellPostUpdateCounter.set(0);
 
+    final AtomicReference<Throwable> exceptionRef = new 
AtomicReference<Throwable>();
     Thread smallCellThread = new Thread(() -> {
-      store.add(smallCell, new NonThreadSafeMemStoreSizing());
+      try {
+        store.add(smallCell, new NonThreadSafeMemStoreSizing());
+      } catch (Throwable exception) {
+        exceptionRef.set(exception);
+      }
     });
     smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME);
     smallCellThread.start();
@@ -1771,9 +1781,9 @@ public class TestHStore {
     String oldThreadName = Thread.currentThread().getName();
     try {
       /**
-       * 1.smallCellThread enters CompactingMemStore.shouldFlushInMemory 
first, when largeCellThread
-       * enters CompactingMemStore.shouldFlushInMemory, 
CompactingMemStore.active.getDataSize could
-       * not accommodate cellToAdd and CompactingMemStore.shouldFlushInMemory 
return true.
+       * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize 
first, then
+       * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, 
and then largeCellThread
+       * invokes flushInMemory.
        * <p/>
        * 2. After largeCellThread finished CompactingMemStore.flushInMemory 
method, smallCellThread
        * can add cell to currentActive . That is to say when largeCellThread 
called flushInMemory
@@ -1792,6 +1802,143 @@ public class TestHStore {
       Thread.currentThread().setName(oldThreadName);
     }
 
+    assertTrue(exceptionRef.get() == null);
+
+  }
+
+  // This test is for HBASE-26210, HBase Write be stuck when there is cell 
which size exceeds
+  // InmemoryFlushSize
+  @Test(timeout = 60000)
+  public void testCompactingMemStoreCellExceedInmemoryFlushSize()
+      throws IOException, InterruptedException {
+    Configuration conf = HBaseConfiguration.create();
+    conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName());
+
+    init(name.getMethodName(), conf, 
ColumnFamilyDescriptorBuilder.newBuilder(family)
+        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
+
+    int size = (int) ((CompactingMemStore) 
store.memstore).getInmemoryFlushSize();
+    byte[] value = new byte[size + 1];
+
+    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
+    long timestamp = EnvironmentEdgeManager.currentTime();
+    long seqId = 100;
+    Cell cell = createCell(qf1, timestamp, seqId, value);
+    int cellByteSize = MutableSegment.getCellLength(cell);
+    store.add(cell, memStoreSizing);
+    assertTrue(memStoreSizing.getCellsCount() == 1);
+    assertTrue(memStoreSizing.getDataSize() == cellByteSize);
+  }
+
+  // This test is for HBASE-26210 also, test write large cell and small cell 
concurrently when
+  // InmemoryFlushSize is smaller,equal with and larger than cell size.
+  @Test
+  public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently()
+      throws IOException, InterruptedException {
+    doWriteTestLargeCellAndSmallCellConcurrently(
+      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1);
+    doWriteTestLargeCellAndSmallCellConcurrently(
+      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize);
+    doWriteTestLargeCellAndSmallCellConcurrently(
+      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + 
largeCellByteSize - 1);
+    doWriteTestLargeCellAndSmallCellConcurrently(
+      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + 
largeCellByteSize);
+    doWriteTestLargeCellAndSmallCellConcurrently(
+      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + 
largeCellByteSize + 1);
+  }
+
+  private void doWriteTestLargeCellAndSmallCellConcurrently(
+      IntBinaryOperator getFlushByteSize)
+      throws IOException, InterruptedException {
+
+    Configuration conf = HBaseConfiguration.create();
+
+    byte[] smallValue = new byte[3];
+    byte[] largeValue = new byte[100];
+    final long timestamp = EnvironmentEdgeManager.currentTime();
+    final long seqId = 100;
+    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
+    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
+    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
+    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
+    int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, 
largeCellByteSize);
+    boolean flushByteSizeLessThanSmallAndLargeCellSize =
+        flushByteSize < (smallCellByteSize + largeCellByteSize);
+
+    conf.set(HStore.MEMSTORE_CLASS_NAME, 
MyCompactingMemStore3.class.getName());
+    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 
0.005);
+    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 
String.valueOf(flushByteSize * 200));
+
+
+    init(name.getMethodName(), conf, 
ColumnFamilyDescriptorBuilder.newBuilder(family)
+        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
+
+    MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) 
store.memstore);
+    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == 
flushByteSize);
+    myCompactingMemStore.disableCompaction();
+    if (flushByteSizeLessThanSmallAndLargeCellSize) {
+      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true;
+    } else {
+      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false;
+    }
+
+
+    final ThreadSafeMemStoreSizing memStoreSizing = new 
ThreadSafeMemStoreSizing();
+    final AtomicLong totalCellByteSize = new AtomicLong(0);
+    final AtomicReference<Throwable> exceptionRef = new 
AtomicReference<Throwable>();
+    Thread smallCellThread = new Thread(() -> {
+      try {
+        for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
+          long currentTimestamp = timestamp + i;
+          Cell cell = createCell(qf1, currentTimestamp, seqId, smallValue);
+          totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
+          store.add(cell, memStoreSizing);
+        }
+      } catch (Throwable exception) {
+        exceptionRef.set(exception);
+
+      }
+    });
+    smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME);
+    smallCellThread.start();
+
+    String oldThreadName = Thread.currentThread().getName();
+    try {
+      /**
+       * When flushByteSizeLessThanSmallAndLargeCellSize is true:
+       * </p>
+       * 1.smallCellThread enters 
MyCompactingMemStore3.checkAndAddToActiveSize first, then
+       * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, 
and then
+       * largeCellThread invokes flushInMemory.
+       * <p/>
+       * 2. After largeCellThread finished CompactingMemStore.flushInMemory 
method, smallCellThread
+       * can run into MyCompactingMemStore3.checkAndAddToActiveSize again.
+       * <p/>
+       * When flushByteSizeLessThanSmallAndLargeCellSize is false: 
smallCellThread and
+       * largeCellThread concurrently write one cell and wait each other, and 
then write another
+       * cell etc.
+       */
+      
Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME);
+      for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
+        long currentTimestamp = timestamp + i;
+        Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
+        totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
+        store.add(cell, memStoreSizing);
+      }
+      smallCellThread.join();
+
+      assertTrue(exceptionRef.get() == null);
+      assertTrue(memStoreSizing.getCellsCount() == 
(MyCompactingMemStore3.CELL_COUNT * 2));
+      assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get());
+      if (flushByteSizeLessThanSmallAndLargeCellSize) {
+        assertTrue(myCompactingMemStore.flushCounter.get() == 
MyCompactingMemStore3.CELL_COUNT);
+      } else {
+        assertTrue(
+          myCompactingMemStore.flushCounter.get() <= 
(MyCompactingMemStore3.CELL_COUNT - 1));
+      }
+    } finally {
+      Thread.currentThread().setName(oldThreadName);
+    }
   }
 
   private HStoreFile mockStoreFileWithLength(long length) {
@@ -1895,7 +2042,7 @@ public class TestHStore {
       return new ArrayList<>(capacity);
     }
     @Override
-    protected void pushActiveToPipeline(MutableSegment active) {
+    protected void pushActiveToPipeline(MutableSegment active, boolean 
checkEmpty) {
       if (START_TEST.get()) {
         try {
           getScannerLatch.await();
@@ -1904,7 +2051,7 @@ public class TestHStore {
         }
       }
 
-      super.pushActiveToPipeline(active);
+      super.pushActiveToPipeline(active, checkEmpty);
       if (START_TEST.get()) {
         snapshotLatch.countDown();
       }
@@ -2001,8 +2148,6 @@ public class TestHStore {
     private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
     private final AtomicInteger largeCellPreUpdateCounter = new 
AtomicInteger(0);
     private final AtomicInteger smallCellPreUpdateCounter = new 
AtomicInteger(0);
-    private final AtomicInteger largeCellPostUpdateCounter = new 
AtomicInteger(0);
-    private final AtomicInteger smallCellPostUpdateCounter = new 
AtomicInteger(0);
 
     public MyCompactingMemStore2(Configuration conf, CellComparatorImpl 
cellComparator,
         HStore store, RegionServicesForStores regionServices,
@@ -2010,16 +2155,17 @@ public class TestHStore {
       super(conf, cellComparator, store, regionServices, compactionPolicy);
     }
 
-    protected boolean shouldFlushInMemory(MutableSegment currActive, Cell 
cellToAdd,
+    @Override
+    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell 
cellToAdd,
         MemStoreSizing memstoreSizing) {
       if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
         int currentCount = largeCellPreUpdateCounter.incrementAndGet();
         if (currentCount <= 1) {
           try {
             /**
-             * smallCellThread enters super.shouldFlushInMemory first, when 
largeCellThread enters
-             * super.shouldFlushInMemory, currActive.getDataSize could not 
accommodate cellToAdd and
-             * super.shouldFlushInMemory return true.
+             * smallCellThread enters 
CompactingMemStore.checkAndAddToActiveSize first, then
+             * largeCellThread enters 
CompactingMemStore.checkAndAddToActiveSize, and then
+             * largeCellThread invokes flushInMemory.
              */
             preCyclicBarrier.await();
           } catch (Throwable e) {
@@ -2028,7 +2174,7 @@ public class TestHStore {
         }
       }
 
-      boolean returnValue = super.shouldFlushInMemory(currActive, cellToAdd, 
memstoreSizing);
+      boolean returnValue = super.checkAndAddToActiveSize(currActive, 
cellToAdd, memstoreSizing);
       if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
         try {
           preCyclicBarrier.await();
@@ -2071,4 +2217,93 @@ public class TestHStore {
     }
 
   }
+
+  public static class MyCompactingMemStore3 extends CompactingMemStore {
+    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
+    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
+
+    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
+    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
+    private final AtomicInteger flushCounter = new AtomicInteger(0);
+    private static final int CELL_COUNT = 5;
+    private boolean flushByteSizeLessThanSmallAndLargeCellSize = true;
+
+    public MyCompactingMemStore3(Configuration conf, CellComparatorImpl 
cellComparator,
+        HStore store, RegionServicesForStores regionServices,
+        MemoryCompactionPolicy compactionPolicy) throws IOException {
+      super(conf, cellComparator, store, regionServices, compactionPolicy);
+    }
+
+    @Override
+    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell 
cellToAdd,
+        MemStoreSizing memstoreSizing) {
+      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
+        return super.checkAndAddToActiveSize(currActive, cellToAdd, 
memstoreSizing);
+      }
+      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
+        try {
+          preCyclicBarrier.await();
+        } catch (Throwable e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      boolean returnValue = super.checkAndAddToActiveSize(currActive, 
cellToAdd, memstoreSizing);
+      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
+        try {
+          preCyclicBarrier.await();
+        } catch (Throwable e) {
+          throw new RuntimeException(e);
+        }
+      }
+      return returnValue;
+    }
+
+    @Override
+    protected void postUpdate(MutableSegment currentActiveMutableSegment) {
+      super.postUpdate(currentActiveMutableSegment);
+      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
+        try {
+          postCyclicBarrier.await();
+        } catch (Throwable e) {
+          throw new RuntimeException(e);
+        }
+        return;
+      }
+
+      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
+        try {
+          postCyclicBarrier.await();
+        } catch (Throwable e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    @Override
+    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
+      super.flushInMemory(currentActiveMutableSegment);
+      flushCounter.incrementAndGet();
+      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
+        return;
+      }
+
+      
assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME));
+      try {
+        postCyclicBarrier.await();
+      } catch (Throwable e) {
+        throw new RuntimeException(e);
+      }
+
+    }
+
+    void disableCompaction() {
+      allowCompaction.set(false);
+    }
+
+    void enableCompaction() {
+      allowCompaction.set(true);
+    }
+
+  }
 }

Reply via email to