HBASE-18375: Fix the bug where the pool chunks from ChunkCreator are 
deallocated and not returned to pool, because there is no reference to them


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/75a6b368
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/75a6b368
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/75a6b368

Branch: refs/heads/HBASE-14070.HLC
Commit: 75a6b36849c58d6a751f57226ab0c8f7884a9e87
Parents: 092dc6d
Author: anastas <anas...@yahoo-inc.com>
Authored: Thu Aug 17 18:23:19 2017 +0300
Committer: anastas <anas...@yahoo-inc.com>
Committed: Thu Aug 17 18:23:19 2017 +0300

----------------------------------------------------------------------
 .../regionserver/CellChunkImmutableSegment.java |   5 +-
 .../hadoop/hbase/regionserver/ChunkCreator.java | 171 +++++++++----------
 .../hbase/regionserver/CompactionPipeline.java  |  19 ++-
 .../hbase/regionserver/MemStoreLABImpl.java     |  27 ++-
 .../hbase/regionserver/TestMemStoreLAB.java     |  12 +-
 .../TestMemstoreLABWithoutPool.java             |   3 +-
 6 files changed, 126 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/75a6b368/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
index cdda279..3653166 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
@@ -176,10 +176,7 @@ public class CellChunkImmutableSegment extends 
ImmutableSegment {
   private int createCellReference(ByteBufferKeyValue cell, ByteBuffer 
idxBuffer, int idxOffset) {
     int offset = idxOffset;
     int dataChunkID = cell.getChunkId();
-    // ensure strong pointer to data chunk, as index is no longer directly 
points to it
-    Chunk c = ChunkCreator.getInstance().saveChunkFromGC(dataChunkID);
-    // if c is null, it means that this cell chunks was already released 
shouldn't happen
-    assert (c!=null);
+
     offset = ByteBufferUtils.putInt(idxBuffer, offset, dataChunkID);    // 
write data chunk id
     offset = ByteBufferUtils.putInt(idxBuffer, offset, cell.getOffset());      
    // offset
     offset = ByteBufferUtils.putInt(idxBuffer, offset, 
KeyValueUtil.length(cell)); // length

http://git-wip-us.apache.org/repos/asf/hbase/blob/75a6b368/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
index 7e5395c..e818426 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.lang.ref.WeakReference;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
@@ -58,21 +57,8 @@ public class ChunkCreator {
   // the header size need to be changed in case chunk id size is changed
   public static final int SIZEOF_CHUNK_HEADER = Bytes.SIZEOF_INT;
 
-  // An object pointed by a weak reference can be garbage collected, in 
opposite to an object
-  // referenced by a strong (regular) reference. Every chunk created via 
ChunkCreator is referenced
-  // from either weakChunkIdMap or strongChunkIdMap.
-  // Upon chunk C creation, C's ID is mapped into weak reference to C, in 
order not to disturb C's
-  // GC in case all other reference to C are going to be removed.
-  // When chunk C is referenced from CellChunkMap (via C's ID) it is possible 
to GC the chunk C.
-  // To avoid that upon inserting C into CellChunkMap, C's ID is mapped into 
strong (regular)
-  // reference to C.
-
-  // map that doesn't influence GC
-  private Map<Integer, WeakReference<Chunk>> weakChunkIdMap =
-      new ConcurrentHashMap<Integer, WeakReference<Chunk>>();
-
-  // map that keeps chunks from garbage collection
-  private Map<Integer, Chunk> strongChunkIdMap = new 
ConcurrentHashMap<Integer, Chunk>();
+  // mapping from chunk IDs to chunks
+  private Map<Integer, Chunk> chunkIdMap = new ConcurrentHashMap<Integer, 
Chunk>();
 
   private final int chunkSize;
   private final boolean offheap;
@@ -95,7 +81,7 @@ public class ChunkCreator {
   }
 
   /**
-   * Initializes the instance of MSLABChunkCreator
+   * Initializes the instance of ChunkCreator
    * @param chunkSize the chunkSize
    * @param offheap indicates if the chunk is to be created offheap or not
    * @param globalMemStoreSize  the global memstore size
@@ -120,10 +106,19 @@ public class ChunkCreator {
   }
 
   /**
-   * Creates and inits a chunk.
+   * Creates and inits a chunk. The default implementation.
    * @return the chunk that was initialized
    */
   Chunk getChunk() {
+    return getChunk(CompactingMemStore.IndexType.ARRAY_MAP);
+  }
+
+  /**
+   * Creates and inits a chunk.
+   * @return the chunk that was initialized
+   * @param chunkIndexType whether the requested chunk is going to be used 
with CellChunkMap index
+   */
+  Chunk getChunk(CompactingMemStore.IndexType chunkIndexType) {
     Chunk chunk = null;
     if (pool != null) {
       //  the pool creates the chunk internally. The chunk#init() call happens 
here
@@ -137,68 +132,45 @@ public class ChunkCreator {
       }
     }
     if (chunk == null) {
-      chunk = createChunk();
+      // the second boolean parameter means:
+      // if CellChunkMap index is requested, put allocated on demand chunk 
mapping into chunkIdMap
+      chunk = createChunk(false, chunkIndexType);
     }
-    // put this chunk initially into the weakChunkIdMap
-    this.weakChunkIdMap.put(chunk.getId(), new WeakReference<>(chunk));
+
     // now we need to actually do the expensive memory allocation step in case 
of a new chunk,
     // else only the offset is set to the beginning of the chunk to accept 
allocations
     chunk.init();
     return chunk;
   }
 
-  private Chunk createChunk() {
-    return createChunk(false);
-  }
-
   /**
    * Creates the chunk either onheap or offheap
    * @param pool indicates if the chunks have to be created which will be used 
by the Pool
+   * @param chunkIndexType
    * @return the chunk
    */
-  private Chunk createChunk(boolean pool) {
+  private Chunk createChunk(boolean pool, CompactingMemStore.IndexType 
chunkIndexType) {
+    Chunk chunk = null;
     int id = chunkID.getAndIncrement();
     assert id > 0;
     // do not create offheap chunk on demand
     if (pool && this.offheap) {
-      return new OffheapChunk(chunkSize, id, pool);
+      chunk = new OffheapChunk(chunkSize, id, pool);
     } else {
-      return new OnheapChunk(chunkSize, id, pool);
+      chunk = new OnheapChunk(chunkSize, id, pool);
     }
+    if (pool || (chunkIndexType == CompactingMemStore.IndexType.CHUNK_MAP)) {
+      // put the pool chunk into the chunkIdMap so it is not GC-ed
+      this.chunkIdMap.put(chunk.getId(), chunk);
+    }
+    return chunk;
   }
 
   @VisibleForTesting
   // Used to translate the ChunkID into a chunk ref
   Chunk getChunk(int id) {
-    WeakReference<Chunk> ref = weakChunkIdMap.get(id);
-    if (ref != null) {
-      return ref.get();
-    }
-    // check also the strong mapping
-    return strongChunkIdMap.get(id);
-  }
-
-  // transfer the weak pointer to be a strong chunk pointer
-  Chunk saveChunkFromGC(int chunkID) {
-    Chunk c = strongChunkIdMap.get(chunkID); // check whether the chunk is 
already protected
-    if (c != null)                           // with strong pointer
-      return c;
-    WeakReference<Chunk> ref = weakChunkIdMap.get(chunkID);
-    if (ref != null) {
-      c = ref.get();
-    }
-    if (c != null) {
-      // put this strong reference to chunk into the strongChunkIdMap
-      // the read of the weakMap is always happening before the read of the 
strongMap
-      // so no synchronization issues here
-      this.strongChunkIdMap.put(chunkID, c);
-      this.weakChunkIdMap.remove(chunkID);
-      return c;
-    }
-    // we should actually never return null as someone should not ask to save 
from GC a chunk,
-    // which is already released. However, we are not asserting it here and we 
let the caller
-    // to deal with the return value an assert if needed
-    return null;
+    // can return null if chunk was never mapped
+    return chunkIdMap.get(id);
   }
 
   int getChunkSize() {
@@ -210,30 +182,23 @@ public class ChunkCreator {
   }
 
   private void removeChunks(Set<Integer> chunkIDs) {
-    this.weakChunkIdMap.keySet().removeAll(chunkIDs);
-    this.strongChunkIdMap.keySet().removeAll(chunkIDs);
+    this.chunkIdMap.keySet().removeAll(chunkIDs);
   }
 
   Chunk removeChunk(int chunkId) {
-    WeakReference<Chunk> weak = this.weakChunkIdMap.remove(chunkId);
-    Chunk strong = this.strongChunkIdMap.remove(chunkId);
-    if (weak != null) {
-      return weak.get();
-    }
-    return strong;
+    return this.chunkIdMap.remove(chunkId);
   }
 
   @VisibleForTesting
-  // the chunks in the weakChunkIdMap may already be released so we shouldn't 
relay
+  // the chunks in the chunkIdMap may already be released so we shouldn't relay
   // on this counting for strong correctness. This method is used only in 
testing.
-  int size() {
-    return this.weakChunkIdMap.size()+this.strongChunkIdMap.size();
+  int numberOfMappedChunks() {
+    return this.chunkIdMap.size();
   }
 
   @VisibleForTesting
   void clearChunkIds() {
-    this.strongChunkIdMap.clear();
-    this.weakChunkIdMap.clear();
+    this.chunkIdMap.clear();
   }
 
   /**
@@ -262,7 +227,9 @@ public class ChunkCreator {
       this.poolSizePercentage = poolSizePercentage;
       this.reclaimedChunks = new LinkedBlockingQueue<>();
       for (int i = 0; i < initialCount; i++) {
-        Chunk chunk = createChunk(true);
+        // Chunks from pool are covered with strong references anyway
+        // TODO: change to CHUNK_MAP if it is generally defined
+        Chunk chunk = createChunk(true, 
CompactingMemStore.IndexType.ARRAY_MAP);
         chunk.init();
         reclaimedChunks.add(chunk);
       }
@@ -281,7 +248,7 @@ public class ChunkCreator {
      * then.
      * Note: Chunks returned by this pool must be put back to the pool after 
its use.
      * @return a chunk
-     * @see #putbackChunks(Set)
+     * @see #putbackChunks(Chunk)
      */
     Chunk getChunk() {
       Chunk chunk = reclaimedChunks.poll();
@@ -294,7 +261,8 @@ public class ChunkCreator {
           long created = this.chunkCount.get();
           if (created < this.maxCount) {
             if (this.chunkCount.compareAndSet(created, created + 1)) {
-              chunk = createChunk(true);
+              // TODO: change to CHUNK_MAP if it is generally defined
+              chunk = createChunk(true, 
CompactingMemStore.IndexType.ARRAY_MAP);
               break;
             }
           } else {
@@ -308,21 +276,16 @@ public class ChunkCreator {
     /**
      * Add the chunks to the pool, when the pool achieves the max size, it 
will skip the remaining
      * chunks
-     * @param chunks
+     * @param c
      */
-    private void putbackChunks(Set<Integer> chunks) {
-      int toAdd = Math.min(chunks.size(), this.maxCount - 
reclaimedChunks.size());
-      Iterator<Integer> iterator = chunks.iterator();
-      while (iterator.hasNext()) {
-        Integer chunkId = iterator.next();
-        // remove the chunks every time though they are from the pool or not
-        Chunk chunk = ChunkCreator.this.removeChunk(chunkId);
-        if (chunk != null) {
-          if (chunk.isFromPool() && toAdd > 0) {
-            reclaimedChunks.add(chunk);
-          }
-          toAdd--;
-        }
+    private void putbackChunks(Chunk c) {
+      int toAdd = this.maxCount - reclaimedChunks.size();
+      if (c.isFromPool() && toAdd > 0) {
+        reclaimedChunks.add(c);
+      } else {
+        // remove the chunk (that is not going to pool)
+        // though it is initially from the pool or not
+        ChunkCreator.this.removeChunk(c.getId());
       }
     }
 
@@ -433,6 +396,20 @@ public class ChunkCreator {
     return 0;
   }
 
+  @VisibleForTesting
+  boolean isChunkInPool(int chunkId) {
+    if (pool != null) {
+      // chunks that are from pool will return true chunk reference not null
+      Chunk c = getChunk(chunkId);
+      if (c==null) {
+        return false;
+      }
+      return pool.reclaimedChunks.contains(c);
+    }
+
+    return false;
+  }
+
   /*
    * Only used in testing
    */
@@ -444,10 +421,24 @@ public class ChunkCreator {
   }
 
   synchronized void putbackChunks(Set<Integer> chunks) {
-    if (pool != null) {
-      pool.putbackChunks(chunks);
-    } else {
+    // if there is no pool just try to clear the chunkIdMap in case there is 
something
+    if ( pool == null ) {
       this.removeChunks(chunks);
+      return;
     }
+
+    // if there is pool, go over all chunk IDs that came back, the chunks may 
be from pool or not
+    for (int chunkID : chunks) {
+      // translate chunk ID to chunk, if chunk initially wasn't in pool
+      // this translation will (most likely) return null
+      Chunk chunk = ChunkCreator.this.getChunk(chunkID);
+      if (chunk != null) {
+        pool.putbackChunks(chunk);
+      }
+      // if chunk is null, it was never covered by the chunkIdMap (and so 
wasn't in pool also),
+      // so we have nothing to do on its release
+    }
+    return;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/75a6b368/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 5136f24..f281392 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -43,10 +43,10 @@ import org.apache.hadoop.hbase.util.ClassSize;
  * suffix of the pipeline.
  *
  * The synchronization model is copy-on-write. Methods which change the 
structure of the
- * pipeline (pushHead() and swap()) apply their changes in the context of a 
lock. They also make
- * a read-only copy of the pipeline's list. Read methods read from a read-only 
copy. If a read
- * method accesses the read-only copy more than once it makes a local copy of 
it
- * to ensure it accesses the same copy.
+ * pipeline (pushHead(), flattenOneSegment() and swap()) apply their changes 
in the context of a
+ * lock. They also make a read-only copy of the pipeline's list. Read methods 
read from a
+ * read-only copy. If a read method accesses the read-only copy more than once 
it makes a local
+ * copy of it to ensure it accesses the same copy.
  *
  * The methods getVersionedList(), getVersionedTail(), and flattenOneSegment() 
are also
  * protected by a lock since they need to have a consistent (atomic) view of 
the pipeline list
@@ -261,6 +261,8 @@ public class CompactionPipeline {
 
   private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment 
segment,
       boolean closeSegmentsInSuffix) {
+    pipeline.removeAll(suffix);
+    if(segment != null) pipeline.addLast(segment);
     // During index merge we won't be closing the segments undergoing the 
merge. Segment#close()
     // will release the MSLAB chunks to pool. But in case of index merge there 
wont be any data copy
     // from old MSLABs. So the new cells in new segment also refers to same 
chunks. In case of data
@@ -272,15 +274,20 @@ public class CompactionPipeline {
         itemInSuffix.close();
       }
     }
-    pipeline.removeAll(suffix);
-    if(segment != null) pipeline.addLast(segment);
   }
 
   // replacing one segment in the pipeline with a new one exactly at the same 
index
   // need to be called only within synchronized block
+  
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT",
+      justification="replaceAtIndex is invoked under a synchronize block so 
safe")
   private void replaceAtIndex(int idx, ImmutableSegment newSegment) {
     pipeline.set(idx, newSegment);
     readOnlyCopy = new LinkedList<>(pipeline);
+    // the version increment is indeed needed, because the swap uses 
removeAll() method of the
+    // linked-list that compares the objects to find what to remove.
+    // The flattening changes the segment object completely (creation pattern) 
and so
+    // swap will not proceed correctly after concurrent flattening.
+    version++;
   }
 
   public Segment getTail() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/75a6b368/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
index 85e2abe..2ae665e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
@@ -78,6 +78,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
   private final int chunkSize;
   private final int maxAlloc;
   private final ChunkCreator chunkCreator;
+  private final CompactingMemStore.IndexType idxType; // what index is used 
for corresponding segment
 
   // This flag is for closing this instance, its set when clearing snapshot of
   // memstore
@@ -100,6 +101,9 @@ public class MemStoreLABImpl implements MemStoreLAB {
     // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
     Preconditions.checkArgument(maxAlloc <= chunkSize,
         MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
+    idxType = CompactingMemStore.IndexType.valueOf(conf.get(
+        CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
+        CompactingMemStore.COMPACTING_MEMSTORE_INDEX_DEFAULT));
   }
 
   @Override
@@ -239,7 +243,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
         if (c != null) {
           return c;
         }
-        c = this.chunkCreator.getChunk();
+        c = this.chunkCreator.getChunk(idxType);
         if (c != null) {
           // set the curChunk. No need of CAS as only one thread will be here
           curChunk.set(c);
@@ -253,12 +257,15 @@ public class MemStoreLABImpl implements MemStoreLAB {
     return null;
   }
 
-  // Returning a new chunk, without replacing current chunk,
-  // meaning MSLABImpl does not make the returned chunk as CurChunk.
-  // The space on this chunk will be allocated externally
-  // The interface is only for external callers
+  /* Creating chunk to be used as index chunk in CellChunkMap, part of the 
chunks array.
+  ** Returning a new chunk, without replacing current chunk,
+  ** meaning MSLABImpl does not make the returned chunk as CurChunk.
+  ** The space on this chunk will be allocated externally.
+  ** The interface is only for external callers
+  */
   @Override
   public Chunk getNewExternalChunk() {
+    // the new chunk is going to be part of the chunk array and will always be 
referenced
     Chunk c = this.chunkCreator.getChunk();
     chunks.add(c.getId());
     return c;
@@ -280,4 +287,14 @@ public class MemStoreLABImpl implements MemStoreLAB {
     }
     return pooledChunks;
   }
+
+  @VisibleForTesting Integer getNumOfChunksReturnedToPool() {
+    int i = 0;
+    for (Integer id : this.chunks) {
+      if (chunkCreator.isChunkInPool(id)) {
+        i++;
+      }
+    }
+    return i;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/75a6b368/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
index f171dd0..06b9c40 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
@@ -247,14 +247,16 @@ public class TestMemStoreLAB {
         }
       }
       // none of the chunkIds would have been returned back
-      assertTrue("All the chunks must have been cleared", 
ChunkCreator.INSTANCE.size() != 0);
+      assertTrue("All the chunks must have been cleared",
+          ChunkCreator.INSTANCE.numberOfMappedChunks() != 0);
+      int pooledChunksNum = mslab.getPooledChunks().size();
       // close the mslab
       mslab.close();
-      // make sure all chunks reclaimed or removed from chunk queue
-      int queueLength = mslab.getPooledChunks().size();
+      // make sure all chunks where reclaimed back to pool
+      int queueLength = mslab.getNumOfChunksReturnedToPool();
       assertTrue("All chunks in chunk queue should be reclaimed or removed"
-          + " after mslab closed but actually: " + queueLength,
-        queueLength == 0);
+          + " after mslab closed but actually: " + 
(pooledChunksNum-queueLength),
+          pooledChunksNum-queueLength == 0);
     } finally {
       ChunkCreator.INSTANCE = oldInstance;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/75a6b368/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java
index 96be8ec..d3f9bc1 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java
@@ -140,7 +140,8 @@ public class TestMemstoreLABWithoutPool {
       mslab[i].close();
     }
     // all of the chunkIds would have been returned back
-    assertTrue("All the chunks must have been cleared", 
ChunkCreator.INSTANCE.size() == 0);
+    assertTrue("All the chunks must have been cleared",
+        ChunkCreator.INSTANCE.numberOfMappedChunks() == 0);
   }
 
   private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String 
threadName,

Reply via email to