HBASE-16438 Create a cell type so that chunk id is embedded in it (Ram)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c2c2178b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c2c2178b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c2c2178b Branch: refs/heads/hbase-12439 Commit: c2c2178b2eebe4439eadec6b37fae2566944c16b Parents: c8cd921 Author: Ramkrishna <ramkrishna.s.vasude...@intel.com> Authored: Mon Apr 17 09:10:59 2017 +0530 Committer: Ramkrishna <ramkrishna.s.vasude...@intel.com> Committed: Mon Apr 17 09:28:24 2017 +0530 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hbase/CellUtil.java | 24 -- .../org/apache/hadoop/hbase/ExtendedCell.java | 10 + .../org/apache/hadoop/hbase/master/HMaster.java | 2 + .../hbase/regionserver/ByteBufferChunkCell.java | 48 +++ .../apache/hadoop/hbase/regionserver/Chunk.java | 60 ++- .../hadoop/hbase/regionserver/ChunkCreator.java | 404 +++++++++++++++++++ .../hbase/regionserver/HRegionServer.java | 14 +- .../hbase/regionserver/MemStoreChunkPool.java | 265 ------------ .../hadoop/hbase/regionserver/MemStoreLAB.java | 4 +- .../hbase/regionserver/MemStoreLABImpl.java | 171 ++++---- .../regionserver/NoTagByteBufferChunkCell.java | 48 +++ .../hadoop/hbase/regionserver/OffheapChunk.java | 31 +- .../hadoop/hbase/regionserver/OnheapChunk.java | 32 +- .../hadoop/hbase/HBaseTestingUtility.java | 3 + .../coprocessor/TestCoprocessorInterface.java | 4 + .../TestRegionObserverScannerOpenHook.java | 3 + .../coprocessor/TestRegionObserverStacking.java | 3 + .../io/hfile/TestScannerFromBucketCache.java | 3 + .../hadoop/hbase/master/TestCatalogJanitor.java | 7 + .../hadoop/hbase/regionserver/TestBulkLoad.java | 2 +- .../hbase/regionserver/TestCellFlatSet.java | 2 +- .../regionserver/TestCompactingMemStore.java | 37 +- .../TestCompactingToCellArrayMapMemStore.java | 16 +- .../TestCompactionArchiveConcurrentClose.java | 1 + .../TestCompactionArchiveIOException.java | 1 + .../regionserver/TestCompactionPolicy.java | 1 + .../hbase/regionserver/TestDefaultMemStore.java | 14 +- .../regionserver/TestFailedAppendAndSync.java | 1 + .../hbase/regionserver/TestHMobStore.java | 2 +- .../hadoop/hbase/regionserver/TestHRegion.java | 2 + .../regionserver/TestHRegionReplayEvents.java | 2 +- .../regionserver/TestMemStoreChunkPool.java | 48 +-- .../hbase/regionserver/TestMemStoreLAB.java | 27 +- .../TestMemstoreLABWithoutPool.java | 168 ++++++++ .../hbase/regionserver/TestRecoveredEdits.java | 1 + .../hbase/regionserver/TestRegionIncrement.java | 1 + .../hadoop/hbase/regionserver/TestStore.java | 1 + .../TestStoreFileRefresherChore.java | 1 + .../hbase/regionserver/TestWALLockup.java | 1 + .../TestWALMonotonicallyIncreasingSeqId.java | 1 + .../hbase/regionserver/wal/TestDurability.java | 3 + 41 files changed, 990 insertions(+), 479 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index e1bc969..56de21b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -3135,28 +3135,4 @@ public final class CellUtil { return Type.DeleteFamily.getCode(); } } - - /** - * Clone the passed cell by copying its data into the passed buf. - */ - public static Cell copyCellTo(Cell cell, ByteBuffer buf, int offset, int len) { - int tagsLen = cell.getTagsLength(); - if (cell instanceof ExtendedCell) { - ((ExtendedCell) cell).write(buf, offset); - } else { - // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the - // other case also. The data fragments within Cell is copied into buf as in KeyValue - // serialization format only. - KeyValueUtil.appendTo(cell, buf, offset, true); - } - if (tagsLen == 0) { - // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class - // which directly return tagsLen as 0. So we avoid parsing many length components in - // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell - // call getTagsLength(). - return new NoTagsByteBufferKeyValue(buf, offset, len, cell.getSequenceId()); - } else { - return new ByteBufferKeyValue(buf, offset, len, cell.getSequenceId()); - } - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java index 517873f..10f20ca 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.io.HeapSize; public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestamp, HeapSize, Cloneable { + public static int CELL_NOT_BASED_ON_CHUNK = -1; /** * Write this cell to an OutputStream in a {@link KeyValue} format. * <br> KeyValue format <br> @@ -73,4 +74,13 @@ public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestam * @return The deep cloned cell */ Cell deepClone(); + + /** + * Extracts the id of the backing bytebuffer of this cell if it was obtained from fixed sized + * chunks as in case of MemstoreLAB + * @return the chunk id if the cell is backed by fixed sized Chunks, else return -1 + */ + default int getChunkId() { + return CELL_NOT_BASED_ON_CHUNK; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index bb9f282..f9670e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -748,6 +748,8 @@ public class HMaster extends HRegionServer implements MasterServices { this.masterActiveTime = System.currentTimeMillis(); // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring. + // Initialize the chunkCreator + initializeMemStoreChunkCreator(); this.fileSystemManager = new MasterFileSystem(this); this.walManager = new MasterWalManager(this); http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java new file mode 100644 index 0000000..a8f1000 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.ByteBufferKeyValue; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; + +/** + * ByteBuffer based cell which has the chunkid at the 0th offset + * @see MemStoreLAB + */ +//TODO : When moving this cell to CellChunkMap we will have the following things +// to be serialized +// chunkId (Integer) + offset (Integer) + length (Integer) + seqId (Long) = 20 bytes +@InterfaceAudience.Private +public class ByteBufferChunkCell extends ByteBufferKeyValue { + public ByteBufferChunkCell(ByteBuffer buf, int offset, int length) { + super(buf, offset, length); + } + + public ByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) { + super(buf, offset, length, seqId); + } + + @Override + public int getChunkId() { + // The chunkId is embedded at the 0th offset of the bytebuffer + return ByteBufferUtils.toInt(buf, 0); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java index 2cbf0a3..fc4aa0b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java @@ -21,8 +21,10 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; /** * A chunk of memory out of which allocations are sliced. @@ -46,13 +48,41 @@ public abstract class Chunk { /** Size of chunk in bytes */ protected final int size; + // The unique id associated with the chunk. + private final int id; + + // indicates if the chunk is formed by ChunkCreator#MemstorePool + private final boolean fromPool; + + /** + * Create an uninitialized chunk. Note that memory is not allocated yet, so + * this is cheap. + * @param size in bytes + * @param id the chunk id + */ + public Chunk(int size, int id) { + this(size, id, false); + } + /** - * Create an uninitialized chunk. Note that memory is not allocated yet, so this is cheap. - * + * Create an uninitialized chunk. Note that memory is not allocated yet, so + * this is cheap. * @param size in bytes + * @param id the chunk id + * @param fromPool if the chunk is formed by pool */ - Chunk(int size) { + public Chunk(int size, int id, boolean fromPool) { this.size = size; + this.id = id; + this.fromPool = fromPool; + } + + int getId() { + return this.id; + } + + boolean isFromPool() { + return this.fromPool; } /** @@ -60,7 +90,24 @@ public abstract class Chunk { * constructed the chunk. It is thread-safe against other threads calling alloc(), who will block * until the allocation is complete. */ - public abstract void init(); + public void init() { + assert nextFreeOffset.get() == UNINITIALIZED; + try { + allocateDataBuffer(); + } catch (OutOfMemoryError e) { + boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); + assert failInit; // should be true. + throw e; + } + // Mark that it's ready for use + // Move 8 bytes since the first 8 bytes are having the chunkid in it + boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, Bytes.SIZEOF_LONG); + // We should always succeed the above CAS since only one thread + // calls init()! + Preconditions.checkState(initted, "Multiple threads tried to init same chunk"); + } + + abstract void allocateDataBuffer(); /** * Reset the offset to UNINITIALIZED before before reusing an old chunk @@ -74,7 +121,8 @@ public abstract class Chunk { /** * Try to allocate <code>size</code> bytes from the chunk. - * + * If a chunk is tried to get allocated before init() call, the thread doing the allocation + * will be in busy-wait state as it will keep looping till the nextFreeOffset is set. * @return the offset of the successful allocation, or -1 to indicate not-enough-space */ public int alloc(int size) { @@ -96,7 +144,7 @@ public abstract class Chunk { if (oldOffset + size > data.capacity()) { return -1; // alloc doesn't fit } - + // TODO : If seqID is to be written add 8 bytes here for nextFreeOFfset // Try to atomically claim this chunk if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) { // we got the alloc http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java new file mode 100644 index 0000000..073fb25 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java @@ -0,0 +1,404 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.lang.ref.SoftReference; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Does the management of memstoreLAB chunk creations. A monotonically incrementing id is associated + * with every chunk + */ +@InterfaceAudience.Private +public class ChunkCreator { + private final Log LOG = LogFactory.getLog(ChunkCreator.class); + // monotonically increasing chunkid + private AtomicInteger chunkID = new AtomicInteger(1); + // maps the chunk against the monotonically increasing chunk id. We need to preserve the + // natural ordering of the key + // CellChunkMap creation should convert the soft ref to hard reference + private Map<Integer, SoftReference<Chunk>> chunkIdMap = + new ConcurrentHashMap<Integer, SoftReference<Chunk>>(); + private final int chunkSize; + private final boolean offheap; + @VisibleForTesting + static ChunkCreator INSTANCE; + @VisibleForTesting + static boolean chunkPoolDisabled = false; + private MemStoreChunkPool pool; + + @VisibleForTesting + ChunkCreator(int chunkSize, boolean offheap, long globalMemStoreSize, float poolSizePercentage, + float initialCountPercentage, HeapMemoryManager heapMemoryManager) { + this.chunkSize = chunkSize; + this.offheap = offheap; + this.pool = initializePool(globalMemStoreSize, poolSizePercentage, initialCountPercentage); + if (heapMemoryManager != null && this.pool != null) { + // Register with Heap Memory manager + heapMemoryManager.registerTuneObserver(this.pool); + } + } + + /** + * Initializes the instance of MSLABChunkCreator + * @param chunkSize the chunkSize + * @param offheap indicates if the chunk is to be created offheap or not + * @param globalMemStoreSize the global memstore size + * @param poolSizePercentage pool size percentage + * @param initialCountPercentage the initial count of the chunk pool if any + * @param heapMemoryManager the heapmemory manager + * @return singleton MSLABChunkCreator + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC", + justification = "Method is called by single thread at the starting of RS") + @VisibleForTesting + public static ChunkCreator initialize(int chunkSize, boolean offheap, long globalMemStoreSize, + float poolSizePercentage, float initialCountPercentage, HeapMemoryManager heapMemoryManager) { + if (INSTANCE != null) return INSTANCE; + INSTANCE = new ChunkCreator(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, + initialCountPercentage, heapMemoryManager); + return INSTANCE; + } + + static ChunkCreator getInstance() { + return INSTANCE; + } + + /** + * Creates and inits a chunk. + * @return the chunk that was initialized + */ + Chunk getChunk() { + Chunk chunk = null; + if (pool != null) { + // the pool creates the chunk internally. The chunk#init() call happens here + chunk = this.pool.getChunk(); + // the pool has run out of maxCount + if (chunk == null) { + if (LOG.isTraceEnabled()) { + LOG.trace("The chunk pool is full. Reached maxCount= " + this.pool.getMaxCount() + + ". Creating chunk onheap."); + } + } + } + if (chunk == null) { + chunk = createChunk(); + } + // put this chunk into the chunkIdMap + this.chunkIdMap.put(chunk.getId(), new SoftReference<>(chunk)); + // now we need to actually do the expensive memory allocation step in case of a new chunk, + // else only the offset is set to the beginning of the chunk to accept allocations + chunk.init(); + return chunk; + } + + private Chunk createChunk() { + return createChunk(false); + } + + /** + * Creates the chunk either onheap or offheap + * @param pool indicates if the chunks have to be created which will be used by the Pool + * @return the chunk + */ + private Chunk createChunk(boolean pool) { + int id = chunkID.getAndIncrement(); + assert id > 0; + // do not create offheap chunk on demand + if (pool && this.offheap) { + return new OffheapChunk(chunkSize, id, pool); + } else { + return new OnheapChunk(chunkSize, id, pool); + } + } + + @VisibleForTesting + // TODO : To be used by CellChunkMap + Chunk getChunk(int id) { + SoftReference<Chunk> ref = chunkIdMap.get(id); + if (ref != null) { + return ref.get(); + } + return null; + } + + int getChunkSize() { + return this.chunkSize; + } + + boolean isOffheap() { + return this.offheap; + } + + private void removeChunks(Set<Integer> chunkIDs) { + this.chunkIdMap.keySet().removeAll(chunkIDs); + } + + Chunk removeChunk(int chunkId) { + SoftReference<Chunk> ref = this.chunkIdMap.remove(chunkId); + if (ref != null) { + return ref.get(); + } + return null; + } + + @VisibleForTesting + int size() { + return this.chunkIdMap.size(); + } + + @VisibleForTesting + void clearChunkIds() { + this.chunkIdMap.clear(); + } + + /** + * A pool of {@link Chunk} instances. + * + * MemStoreChunkPool caches a number of retired chunks for reusing, it could + * decrease allocating bytes when writing, thereby optimizing the garbage + * collection on JVM. + */ + private class MemStoreChunkPool implements HeapMemoryTuneObserver { + private int maxCount; + + // A queue of reclaimed chunks + private final BlockingQueue<Chunk> reclaimedChunks; + private final float poolSizePercentage; + + /** Statistics thread schedule pool */ + private final ScheduledExecutorService scheduleThreadPool; + /** Statistics thread */ + private static final int statThreadPeriod = 60 * 5; + private final AtomicLong chunkCount = new AtomicLong(); + private final AtomicLong reusedChunkCount = new AtomicLong(); + + MemStoreChunkPool(int maxCount, int initialCount, float poolSizePercentage) { + this.maxCount = maxCount; + this.poolSizePercentage = poolSizePercentage; + this.reclaimedChunks = new LinkedBlockingQueue<>(); + for (int i = 0; i < initialCount; i++) { + Chunk chunk = createChunk(true); + chunk.init(); + reclaimedChunks.add(chunk); + } + chunkCount.set(initialCount); + final String n = Thread.currentThread().getName(); + scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() + .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build()); + this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod, + statThreadPeriod, TimeUnit.SECONDS); + } + + /** + * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have + * not yet created max allowed chunks count. When we have already created max allowed chunks and + * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk + * then. + * Note: Chunks returned by this pool must be put back to the pool after its use. + * @return a chunk + * @see #putbackChunks(Set) + */ + Chunk getChunk() { + Chunk chunk = reclaimedChunks.poll(); + if (chunk != null) { + chunk.reset(); + reusedChunkCount.incrementAndGet(); + } else { + // Make a chunk iff we have not yet created the maxCount chunks + while (true) { + long created = this.chunkCount.get(); + if (created < this.maxCount) { + if (this.chunkCount.compareAndSet(created, created + 1)) { + chunk = createChunk(true); + break; + } + } else { + break; + } + } + } + return chunk; + } + + /** + * Add the chunks to the pool, when the pool achieves the max size, it will skip the remaining + * chunks + * @param chunks + */ + private void putbackChunks(Set<Integer> chunks) { + int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size()); + Iterator<Integer> iterator = chunks.iterator(); + while (iterator.hasNext()) { + Integer chunkId = iterator.next(); + // remove the chunks every time though they are from the pool or not + Chunk chunk = ChunkCreator.this.removeChunk(chunkId); + if (chunk != null) { + if (chunk.isFromPool() && toAdd > 0) { + reclaimedChunks.add(chunk); + } + toAdd--; + } + } + } + + private class StatisticsThread extends Thread { + StatisticsThread() { + super("MemStoreChunkPool.StatisticsThread"); + setDaemon(true); + } + + @Override + public void run() { + logStats(); + } + + private void logStats() { + if (!LOG.isDebugEnabled()) return; + long created = chunkCount.get(); + long reused = reusedChunkCount.get(); + long total = created + reused; + LOG.debug("Stats: current pool size=" + reclaimedChunks.size() + + ",created chunk count=" + created + + ",reused chunk count=" + reused + + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent( + (float) reused / (float) total, 2))); + } + } + + private int getMaxCount() { + return this.maxCount; + } + + @Override + public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) { + // don't do any tuning in case of offheap memstore + if (isOffheap()) { + LOG.warn("Not tuning the chunk pool as it is offheap"); + return; + } + int newMaxCount = + (int) (newMemstoreSize * poolSizePercentage / getChunkSize()); + if (newMaxCount != this.maxCount) { + // We need an adjustment in the chunks numbers + if (newMaxCount > this.maxCount) { + // Max chunks getting increased. Just change the variable. Later calls to getChunk() would + // create and add them to Q + LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount); + this.maxCount = newMaxCount; + } else { + // Max chunks getting decreased. We may need to clear off some of the pooled chunks now + // itself. If the extra chunks are serving already, do not pool those when we get them back + LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount); + this.maxCount = newMaxCount; + if (this.reclaimedChunks.size() > newMaxCount) { + synchronized (this) { + while (this.reclaimedChunks.size() > newMaxCount) { + this.reclaimedChunks.poll(); + } + } + } + } + } + } + } + + @VisibleForTesting + static void clearDisableFlag() { + chunkPoolDisabled = false; + } + + private MemStoreChunkPool initializePool(long globalMemStoreSize, float poolSizePercentage, + float initialCountPercentage) { + if (poolSizePercentage <= 0) { + LOG.info("PoolSizePercentage is less than 0. So not using pool"); + return null; + } + if (chunkPoolDisabled) { + return null; + } + if (poolSizePercentage > 1.0) { + throw new IllegalArgumentException( + MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0"); + } + int maxCount = (int) (globalMemStoreSize * poolSizePercentage / getChunkSize()); + if (initialCountPercentage > 1.0 || initialCountPercentage < 0) { + throw new IllegalArgumentException( + MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0"); + } + int initialCount = (int) (initialCountPercentage * maxCount); + LOG.info("Allocating MemStoreChunkPool with chunk size " + + StringUtils.byteDesc(getChunkSize()) + ", max count " + maxCount + + ", initial count " + initialCount); + return new MemStoreChunkPool(maxCount, initialCount, poolSizePercentage); + } + + @VisibleForTesting + int getMaxCount() { + if (pool != null) { + return pool.getMaxCount(); + } + return 0; + } + + @VisibleForTesting + int getPoolSize() { + if (pool != null) { + return pool.reclaimedChunks.size(); + } + return 0; + } + + /* + * Only used in testing + */ + @VisibleForTesting + void clearChunksInPool() { + if (pool != null) { + pool.reclaimedChunks.clear(); + } + } + + synchronized void putbackChunks(Set<Integer> chunks) { + if (pool != null) { + pool.putbackChunks(chunks); + } else { + this.removeChunks(chunks); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index b3b5113..41eb0a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1472,7 +1472,7 @@ public class HRegionServer extends HasThread implements startServiceThreads(); startHeapMemoryManager(); // Call it after starting HeapMemoryManager. - initializeMemStoreChunkPool(); + initializeMemStoreChunkCreator(); LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa + ", sessionid=0x" + @@ -1492,7 +1492,7 @@ public class HRegionServer extends HasThread implements } } - private void initializeMemStoreChunkPool() { + protected void initializeMemStoreChunkCreator() { if (MemStoreLAB.isEnabled(conf)) { // MSLAB is enabled. So initialize MemStoreChunkPool // By this time, the MemstoreFlusher is already initialized. We can get the global limits from @@ -1506,12 +1506,10 @@ public class HRegionServer extends HasThread implements float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); - MemStoreChunkPool pool = MemStoreChunkPool.initialize(globalMemStoreSize, poolSizePercentage, - initialCountPercentage, chunkSize, offheap); - if (pool != null && this.hMemManager != null) { - // Register with Heap Memory manager - this.hMemManager.registerTuneObserver(pool); - } + // init the chunkCreator + ChunkCreator chunkCreator = + ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, + initialCountPercentage, this.hMemManager); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java deleted file mode 100644 index b7ac212..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java +++ /dev/null @@ -1,265 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver; -import org.apache.hadoop.util.StringUtils; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -/** - * A pool of {@link Chunk} instances. - * - * MemStoreChunkPool caches a number of retired chunks for reusing, it could - * decrease allocating bytes when writing, thereby optimizing the garbage - * collection on JVM. - * - * The pool instance is globally unique and could be obtained through - * {@link MemStoreChunkPool#initialize(long, float, float, int, boolean)} - * - * {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating - * bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called - * when MemStore clearing snapshot for flush - */ -@SuppressWarnings("javadoc") -@InterfaceAudience.Private -public class MemStoreChunkPool implements HeapMemoryTuneObserver { - private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class); - - // Static reference to the MemStoreChunkPool - static MemStoreChunkPool GLOBAL_INSTANCE; - /** Boolean whether we have disabled the memstore chunk pool entirely. */ - static boolean chunkPoolDisabled = false; - - private int maxCount; - - // A queue of reclaimed chunks - private final BlockingQueue<Chunk> reclaimedChunks; - private final int chunkSize; - private final float poolSizePercentage; - - /** Statistics thread schedule pool */ - private final ScheduledExecutorService scheduleThreadPool; - /** Statistics thread */ - private static final int statThreadPeriod = 60 * 5; - private final AtomicLong chunkCount = new AtomicLong(); - private final AtomicLong reusedChunkCount = new AtomicLong(); - private final boolean offheap; - - MemStoreChunkPool(int chunkSize, int maxCount, int initialCount, float poolSizePercentage, - boolean offheap) { - this.maxCount = maxCount; - this.chunkSize = chunkSize; - this.poolSizePercentage = poolSizePercentage; - this.offheap = offheap; - this.reclaimedChunks = new LinkedBlockingQueue<>(); - for (int i = 0; i < initialCount; i++) { - Chunk chunk = this.offheap ? new OffheapChunk(chunkSize) : new OnheapChunk(chunkSize); - chunk.init(); - reclaimedChunks.add(chunk); - } - chunkCount.set(initialCount); - final String n = Thread.currentThread().getName(); - scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() - .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build()); - this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod, - statThreadPeriod, TimeUnit.SECONDS); - } - - /** - * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have - * not yet created max allowed chunks count. When we have already created max allowed chunks and - * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk - * then. - * Note: Chunks returned by this pool must be put back to the pool after its use. - * @return a chunk - * @see #putbackChunk(Chunk) - * @see #putbackChunks(BlockingQueue) - */ - Chunk getChunk() { - Chunk chunk = reclaimedChunks.poll(); - if (chunk != null) { - chunk.reset(); - reusedChunkCount.incrementAndGet(); - } else { - // Make a chunk iff we have not yet created the maxCount chunks - while (true) { - long created = this.chunkCount.get(); - if (created < this.maxCount) { - chunk = this.offheap ? new OffheapChunk(this.chunkSize) : new OnheapChunk(this.chunkSize); - if (this.chunkCount.compareAndSet(created, created + 1)) { - break; - } - } else { - break; - } - } - } - return chunk; - } - - /** - * Add the chunks to the pool, when the pool achieves the max size, it will - * skip the remaining chunks - * @param chunks - */ - synchronized void putbackChunks(BlockingQueue<Chunk> chunks) { - int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size()); - Chunk chunk = null; - while ((chunk = chunks.poll()) != null && toAdd > 0) { - reclaimedChunks.add(chunk); - toAdd--; - } - } - - /** - * Add the chunk to the pool, if the pool has achieved the max size, it will - * skip it - * @param chunk - */ - synchronized void putbackChunk(Chunk chunk) { - if (reclaimedChunks.size() < this.maxCount) { - reclaimedChunks.add(chunk); - } - } - - int getPoolSize() { - return this.reclaimedChunks.size(); - } - - /* - * Only used in testing - */ - void clearChunks() { - this.reclaimedChunks.clear(); - } - - private class StatisticsThread extends Thread { - StatisticsThread() { - super("MemStoreChunkPool.StatisticsThread"); - setDaemon(true); - } - - @Override - public void run() { - logStats(); - } - - private void logStats() { - if (!LOG.isDebugEnabled()) return; - long created = chunkCount.get(); - long reused = reusedChunkCount.get(); - long total = created + reused; - LOG.debug("Stats: current pool size=" + reclaimedChunks.size() - + ",created chunk count=" + created - + ",reused chunk count=" + reused - + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent( - (float) reused / (float) total, 2))); - } - } - - /** - * @return the global MemStoreChunkPool instance - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC", - justification = "Method is called by single thread at the starting of RS") - static MemStoreChunkPool initialize(long globalMemStoreSize, float poolSizePercentage, - float initialCountPercentage, int chunkSize, boolean offheap) { - if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE; - if (chunkPoolDisabled) return null; - - if (poolSizePercentage <= 0) { - chunkPoolDisabled = true; - return null; - } - if (poolSizePercentage > 1.0) { - throw new IllegalArgumentException( - MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0"); - } - int maxCount = (int) (globalMemStoreSize * poolSizePercentage / chunkSize); - if (initialCountPercentage > 1.0 || initialCountPercentage < 0) { - throw new IllegalArgumentException( - MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0"); - } - int initialCount = (int) (initialCountPercentage * maxCount); - LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize) - + ", max count " + maxCount + ", initial count " + initialCount); - GLOBAL_INSTANCE = new MemStoreChunkPool(chunkSize, maxCount, initialCount, poolSizePercentage, - offheap); - return GLOBAL_INSTANCE; - } - - /** - * @return The singleton instance of this pool. - */ - static MemStoreChunkPool getPool() { - return GLOBAL_INSTANCE; - } - - int getMaxCount() { - return this.maxCount; - } - - @VisibleForTesting - static void clearDisableFlag() { - chunkPoolDisabled = false; - } - - @Override - public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) { - // don't do any tuning in case of offheap memstore - if (this.offheap) { - LOG.warn("Not tuning the chunk pool as it is offheap"); - return; - } - int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize); - if (newMaxCount != this.maxCount) { - // We need an adjustment in the chunks numbers - if (newMaxCount > this.maxCount) { - // Max chunks getting increased. Just change the variable. Later calls to getChunk() would - // create and add them to Q - LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount); - this.maxCount = newMaxCount; - } else { - // Max chunks getting decreased. We may need to clear off some of the pooled chunks now - // itself. If the extra chunks are serving already, do not pool those when we get them back - LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount); - this.maxCount = newMaxCount; - if (this.reclaimedChunks.size() > newMaxCount) { - synchronized (this) { - while (this.reclaimedChunks.size() > newMaxCount) { - this.reclaimedChunks.poll(); - } - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java index f6d1607..72e937c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; * <p> * The MemStoreLAB is basically a bump-the-pointer allocator that allocates big (2MB) chunks from * and then doles it out to threads that request slices into the array. These chunks can get pooled - * as well. See {@link MemStoreChunkPool}. + * as well. See {@link ChunkCreator}. * <p> * The purpose of this is to combat heap fragmentation in the regionserver. By ensuring that all * Cells in a given memstore refer only to large chunks of contiguous memory, we ensure that @@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; * {@link #copyCellInto(Cell)} gets called. This allocates enough size in the chunk to hold this * cell's data and copies into this area and then recreate a Cell over this copied data. * <p> - * @see MemStoreChunkPool + * @see ChunkCreator */ @InterfaceAudience.Private public interface MemStoreLAB { http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java index 4e87135..4fba82d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java @@ -18,23 +18,26 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.nio.ByteBuffer; +import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; - /** * A memstore-local allocation buffer. * <p> @@ -55,8 +58,8 @@ import com.google.common.base.Preconditions; * would provide a performance improvement - probably would speed up the * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached * anyway. - * The chunks created by this MemStoreLAB can get pooled at {@link MemStoreChunkPool}. - * When the Chunk comes pool, it can be either an on heap or an off heap backed chunk. The chunks, + * The chunks created by this MemStoreLAB can get pooled at {@link ChunkCreator}. + * When the Chunk comes from pool, it can be either an on heap or an off heap backed chunk. The chunks, * which this MemStoreLAB creates on its own (when no chunk available from pool), those will be * always on heap backed. */ @@ -66,14 +69,15 @@ public class MemStoreLABImpl implements MemStoreLAB { static final Log LOG = LogFactory.getLog(MemStoreLABImpl.class); private AtomicReference<Chunk> curChunk = new AtomicReference<>(); - // A queue of chunks from pool contained by this memstore LAB - // TODO: in the future, it would be better to have List implementation instead of Queue, - // as FIFO order is not so important here + // Lock to manage multiple handlers requesting for a chunk + private ReentrantLock lock = new ReentrantLock(); + + // A set of chunks contained by this memstore LAB @VisibleForTesting - BlockingQueue<Chunk> pooledChunkQueue = null; + Set<Integer> chunks = new ConcurrentSkipListSet<Integer>(); private final int chunkSize; private final int maxAlloc; - private final MemStoreChunkPool chunkPool; + private final ChunkCreator chunkCreator; // This flag is for closing this instance, its set when clearing snapshot of // memstore @@ -92,20 +96,12 @@ public class MemStoreLABImpl implements MemStoreLAB { public MemStoreLABImpl(Configuration conf) { chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); - this.chunkPool = MemStoreChunkPool.getPool(); - // currently chunkQueue is only used for chunkPool - if (this.chunkPool != null) { - // set queue length to chunk pool max count to avoid keeping reference of - // too many non-reclaimable chunks - pooledChunkQueue = new LinkedBlockingQueue<>(chunkPool.getMaxCount()); - } - + this.chunkCreator = ChunkCreator.getInstance(); // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! Preconditions.checkArgument(maxAlloc <= chunkSize, MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); } - @Override public Cell copyCellInto(Cell cell) { int size = KeyValueUtil.length(cell); @@ -118,19 +114,52 @@ public class MemStoreLABImpl implements MemStoreLAB { Chunk c = null; int allocOffset = 0; while (true) { + // Try to get the chunk c = getOrMakeChunk(); + // we may get null because the some other thread succeeded in getting the lock + // and so the current thread has to try again to make its chunk or grab the chunk + // that the other thread created // Try to allocate from this chunk - allocOffset = c.alloc(size); - if (allocOffset != -1) { - // We succeeded - this is the common case - small alloc - // from a big buffer - break; + if (c != null) { + allocOffset = c.alloc(size); + if (allocOffset != -1) { + // We succeeded - this is the common case - small alloc + // from a big buffer + break; + } + // not enough space! + // try to retire this chunk + tryRetireChunk(c); } - // not enough space! - // try to retire this chunk - tryRetireChunk(c); } - return CellUtil.copyCellTo(cell, c.getData(), allocOffset, size); + return copyToChunkCell(cell, c.getData(), allocOffset, size); + } + + /** + * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid + * out of it + */ + private Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) { + int tagsLen = cell.getTagsLength(); + if (cell instanceof ExtendedCell) { + ((ExtendedCell) cell).write(buf, offset); + } else { + // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the + // other case also. The data fragments within Cell is copied into buf as in KeyValue + // serialization format only. + KeyValueUtil.appendTo(cell, buf, offset, true); + } + // TODO : write the seqid here. For writing seqId we should create a new cell type so + // that seqId is not used as the state + if (tagsLen == 0) { + // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class + // which directly return tagsLen as 0. So we avoid parsing many length components in + // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell + // call getTagsLength(). + return new NoTagByteBufferChunkCell(buf, offset, len, cell.getSequenceId()); + } else { + return new ByteBufferChunkCell(buf, offset, len, cell.getSequenceId()); + } } /** @@ -142,9 +171,9 @@ public class MemStoreLABImpl implements MemStoreLAB { this.closed = true; // We could put back the chunks to pool for reusing only when there is no // opening scanner which will read their data - if (chunkPool != null && openScannerCount.get() == 0 - && reclaimed.compareAndSet(false, true)) { - chunkPool.putbackChunks(this.pooledChunkQueue); + int count = openScannerCount.get(); + if(count == 0) { + recycleChunks(); } } @@ -162,9 +191,14 @@ public class MemStoreLABImpl implements MemStoreLAB { @Override public void decScannerCount() { int count = this.openScannerCount.decrementAndGet(); - if (this.closed && chunkPool != null && count == 0 - && reclaimed.compareAndSet(false, true)) { - chunkPool.putbackChunks(this.pooledChunkQueue); + if (this.closed && count == 0) { + recycleChunks(); + } + } + + private void recycleChunks() { + if (reclaimed.compareAndSet(false, true)) { + chunkCreator.putbackChunks(chunks); } } @@ -190,45 +224,33 @@ public class MemStoreLABImpl implements MemStoreLAB { * allocate a new one from the JVM. */ private Chunk getOrMakeChunk() { - while (true) { - // Try to get the chunk - Chunk c = curChunk.get(); - if (c != null) { - return c; - } - - // No current chunk, so we want to allocate one. We race - // against other allocators to CAS in an uninitialized chunk - // (which is cheap to allocate) - if (chunkPool != null) { - c = chunkPool.getChunk(); - } - boolean pooledChunk = false; - if (c != null) { - // This is chunk from pool - pooledChunk = true; - } else { - c = new OnheapChunk(chunkSize);// When chunk is not from pool, always make it as on heap. - } - if (curChunk.compareAndSet(null, c)) { - // we won race - now we need to actually do the expensive - // allocation step - c.init(); - if (pooledChunk) { - if (!this.closed && !this.pooledChunkQueue.offer(c)) { - if (LOG.isTraceEnabled()) { - LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: " - + pooledChunkQueue.size()); - } - } + // Try to get the chunk + Chunk c = curChunk.get(); + if (c != null) { + return c; + } + // No current chunk, so we want to allocate one. We race + // against other allocators to CAS in an uninitialized chunk + // (which is cheap to allocate) + if (lock.tryLock()) { + try { + // once again check inside the lock + c = curChunk.get(); + if (c != null) { + return c; } - return c; - } else if (pooledChunk) { - chunkPool.putbackChunk(c); + c = this.chunkCreator.getChunk(); + if (c != null) { + // set the curChunk. No need of CAS as only one thread will be here + curChunk.set(c); + chunks.add(c.getId()); + return c; + } + } finally { + lock.unlock(); } - // someone else won race - that's fine, we'll try to grab theirs - // in the next iteration of the loop. } + return null; } @VisibleForTesting @@ -236,8 +258,15 @@ public class MemStoreLABImpl implements MemStoreLAB { return this.curChunk.get(); } - + @VisibleForTesting BlockingQueue<Chunk> getPooledChunks() { - return this.pooledChunkQueue; + BlockingQueue<Chunk> pooledChunks = new LinkedBlockingQueue<>(); + for (Integer id : this.chunks) { + Chunk chunk = chunkCreator.getChunk(id); + if (chunk != null && chunk.isFromPool()) { + pooledChunks.add(chunk); + } + } + return pooledChunks; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java new file mode 100644 index 0000000..a8ba50c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.NoTagsByteBufferKeyValue; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; + + +/** + * ByteBuffer based cell which has the chunkid at the 0th offset and with no tags + * @see MemStoreLAB + */ +@InterfaceAudience.Private +public class NoTagByteBufferChunkCell extends NoTagsByteBufferKeyValue { + + public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length) { + super(buf, offset, length); + } + + public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) { + super(buf, offset, length, seqId); + } + + @Override + public int getChunkId() { + // The chunkId is embedded at the 0th offset of the bytebuffer + return ByteBufferUtils.toInt(buf, 0); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java index ed98cfa..e244a33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java @@ -21,34 +21,27 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import com.google.common.base.Preconditions; - /** * An off heap chunk implementation. */ @InterfaceAudience.Private public class OffheapChunk extends Chunk { - OffheapChunk(int size) { - super(size); + OffheapChunk(int size, int id) { + // better if this is always created fromPool. This should not be called + super(size, id); + } + + OffheapChunk(int size, int id, boolean fromPool) { + super(size, id, fromPool); + assert fromPool == true; } @Override - public void init() { - assert nextFreeOffset.get() == UNINITIALIZED; - try { - if (data == null) { - data = ByteBuffer.allocateDirect(this.size); - } - } catch (OutOfMemoryError e) { - boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); - assert failInit; // should be true. - throw e; + void allocateDataBuffer() { + if (data == null) { + data = ByteBuffer.allocateDirect(this.size); + data.putLong(0, this.getId()); } - // Mark that it's ready for use - boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0); - // We should always succeed the above CAS since only one thread - // calls init()! - Preconditions.checkState(initted, "Multiple threads tried to init same chunk"); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java index bd33cb5..da34e24 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java @@ -21,33 +21,25 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import com.google.common.base.Preconditions; - /** * An on heap chunk implementation. */ @InterfaceAudience.Private public class OnheapChunk extends Chunk { - OnheapChunk(int size) { - super(size); + OnheapChunk(int size, int id) { + super(size, id); + } + + OnheapChunk(int size, int id, boolean fromPool) { + super(size, id, fromPool); } - public void init() { - assert nextFreeOffset.get() == UNINITIALIZED; - try { - if (data == null) { - data = ByteBuffer.allocate(this.size); - } - } catch (OutOfMemoryError e) { - boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); - assert failInit; // should be true. - throw e; + @Override + void allocateDataBuffer() { + if (data == null) { + data = ByteBuffer.allocate(this.size); + data.putLong(0, this.getId()); } - // Mark that it's ready for use - boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0); - // We should always succeed the above CAS since only one thread - // calls init()! - Preconditions.checkState(initted, "Multiple threads tried to init same chunk"); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 82c2eab..6563122 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -96,6 +96,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -2426,6 +2428,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir, final Configuration conf, final HTableDescriptor htd, boolean initialize) throws IOException { + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); WAL wal = createWal(conf, rootDir, info); return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java index 422c54b..8d8b6df 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -49,8 +50,10 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -397,6 +400,7 @@ public class TestCoprocessorInterface { for(byte [] family : families) { htd.addFamily(new HColumnDescriptor(family)); } + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegionInfo info = new HRegionInfo(tableName, null, null, false); Path path = new Path(DIR + callingMethod); Region r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd); http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index 80d0e3a..b99087d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -47,10 +47,12 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -152,6 +154,7 @@ public class TestRegionObserverScannerOpenHook { for (byte[] family : families) { htd.addFamily(new HColumnDescriptor(family)); } + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); Path path = new Path(DIR + callingMethod); WAL wal = HBaseTestingUtility.createWal(conf, path, info); http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java index 2e44dee..15d449d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java @@ -34,7 +34,9 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.CoprocessorTests; @@ -100,6 +102,7 @@ public class TestRegionObserverStacking extends TestCase { for(byte [] family : families) { htd.addFamily(new HColumnDescriptor(family)); } + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); Path path = new Path(DIR + callingMethod); HRegion r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd); http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java index f1775d0..fae7247 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java @@ -40,8 +40,10 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -342,6 +344,7 @@ public class TestScannerFromBucketCache { private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, String callingMethod, Configuration conf, HBaseTestingUtility test_util, boolean isReadOnly, byte[]... families) throws IOException { + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); Path logDir = test_util.getDataTestDirOnTestFS(callingMethod + ".log"); HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey); final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri); http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java index cc73d9d..32bce26 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java @@ -65,6 +65,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActi import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -73,6 +75,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.Triple; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -88,6 +91,10 @@ public class TestCatalogJanitor { @Rule public TestName name = new TestName(); + @BeforeClass + public static void setup() throws Exception { + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + } /** * Mock MasterServices for tests below. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java index 418aadf..096c5ef 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java @@ -241,7 +241,7 @@ public class TestBulkLoad { for (byte[] family : families) { hTableDescriptor.addFamily(new HColumnDescriptor(family)); } - + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); // TODO We need a way to do this without creating files return HRegion.createHRegion(hRegionInfo, new Path(testFolder.newFolder().toURI()), http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java index 3b4d068..09877b0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java @@ -73,7 +73,7 @@ public class TestCellFlatSet extends TestCase { descCbOnHeap = new CellArrayMap(CellComparator.COMPARATOR,descCells,0,NUM_OF_CELLS,true); CONF.setBoolean(MemStoreLAB.USEMSLAB_KEY, true); CONF.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f); - MemStoreChunkPool.chunkPoolDisabled = false; + ChunkCreator.chunkPoolDisabled = false; } /* Create and test CellSet based on CellArrayMap */ http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index a888c45..9e90f3e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -50,7 +51,7 @@ import static org.junit.Assert.assertTrue; public class TestCompactingMemStore extends TestDefaultMemStore { private static final Log LOG = LogFactory.getLog(TestCompactingMemStore.class); - protected static MemStoreChunkPool chunkPool; + protected static ChunkCreator chunkCreator; protected HRegion region; protected RegionServicesForStores regionServicesForStores; protected HStore store; @@ -65,7 +66,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @After public void tearDown() throws Exception { - chunkPool.clearChunks(); + chunkCreator.clearChunksInPool(); } @Override @@ -84,15 +85,21 @@ public class TestCompactingMemStore extends TestDefaultMemStore { conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000); HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf); HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); - this.region = hbaseUtility.createTestRegion("foobar", hcd); + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("foobar")); + htd.addFamily(hcd); + HRegionInfo info = + new HRegionInfo(TableName.valueOf("foobar"), null, null, false); + WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info); + this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true); + //this.region = hbaseUtility.createTestRegion("foobar", hcd); this.regionServicesForStores = region.getRegionServicesForStores(); this.store = new HStore(region, hcd, conf); long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); - chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, - MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false); - assertTrue(chunkPool != null); + chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, + globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); + assertTrue(chunkCreator != null); } /** @@ -390,7 +397,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { } memstore.clearSnapshot(snapshot.getId()); - int chunkCount = chunkPool.getPoolSize(); + int chunkCount = chunkCreator.getPoolSize(); assertTrue(chunkCount > 0); } @@ -434,16 +441,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore { } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() == 0); + assertTrue(chunkCreator.getPoolSize() == 0); // Chunks will be put back to pool after close scanners; for (KeyValueScanner scanner : scanners) { scanner.close(); } - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); // clear chunks - chunkPool.clearChunks(); + chunkCreator.clearChunksInPool(); // Creating another snapshot @@ -464,7 +471,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { scanner.close(); } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); } @Test @@ -516,16 +523,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore { memstore.add(new KeyValue(row, fam, qf1, 3, val), null); assertEquals(3, memstore.getActive().getCellsCount()); - assertTrue(chunkPool.getPoolSize() == 0); + assertTrue(chunkCreator.getPoolSize() == 0); // Chunks will be put back to pool after close scanners; for (KeyValueScanner scanner : scanners) { scanner.close(); } - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); // clear chunks - chunkPool.clearChunks(); + chunkCreator.clearChunksInPool(); // Creating another snapshot @@ -553,7 +560,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { scanner.close(); } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); } ////////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java index 5a48455..66e107a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -44,17 +44,13 @@ import java.util.List; public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore { private static final Log LOG = LogFactory.getLog(TestCompactingToCellArrayMapMemStore.class); - //private static MemStoreChunkPool chunkPool; - //private HRegion region; - //private RegionServicesForStores regionServicesForStores; - //private HStore store; ////////////////////////////////////////////////////////////////////////////// // Helpers ////////////////////////////////////////////////////////////////////////////// @Override public void tearDown() throws Exception { - chunkPool.clearChunks(); + chunkCreator.clearChunksInPool(); } @Override public void setUp() throws Exception { @@ -408,16 +404,16 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() == 0); + assertTrue(chunkCreator.getPoolSize() == 0); // Chunks will be put back to pool after close scanners; for (KeyValueScanner scanner : scanners) { scanner.close(); } - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); // clear chunks - chunkPool.clearChunks(); + chunkCreator.clearChunksInPool(); // Creating another snapshot @@ -438,7 +434,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore scanner.close(); } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); } @Test @@ -472,7 +468,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } memstore.clearSnapshot(snapshot.getId()); - int chunkCount = chunkPool.getPoolSize(); + int chunkCount = chunkCreator.getPoolSize(); assertTrue(chunkCount > 0); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java index 8e85730..e320368 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java @@ -164,6 +164,7 @@ public class TestCompactionArchiveConcurrentClose { HRegionFileSystem fs = new WaitingHRegionFileSystem(conf, tableDir.getFileSystem(conf), tableDir, info); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, tableDir); final WALFactory wals = new WALFactory(walConf, null, "log_" + info.getEncodedName()); http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java index 89b2368..e7fcf18 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java @@ -174,6 +174,7 @@ public class TestCompactionArchiveIOException { private HRegion initHRegion(HTableDescriptor htd, HRegionInfo info) throws IOException { Configuration conf = testUtil.getConfiguration(); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); Path tableDir = FSUtils.getTableDir(testDir, htd.getTableName()); Path regionDir = new Path(tableDir, info.getEncodedName()); Path storeDir = new Path(regionDir, htd.getColumnFamilies()[0].getNameAsString()); http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java index 7154511..bff5bec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java @@ -104,6 +104,7 @@ public class TestCompactionPolicy { HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); hlog = new FSHLog(fs, basedir, logName, conf); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); region = HRegion.createHRegion(info, basedir, conf, htd, hlog); region.close(); Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 7434eb1..41b304b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.AfterClass; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -84,6 +85,7 @@ public class TestDefaultMemStore { protected static final byte[] FAMILY = Bytes.toBytes("column"); protected MultiVersionConcurrencyControl mvcc; protected AtomicLong startSeqNum = new AtomicLong(0); + protected ChunkCreator chunkCreator; private String getName() { return this.name.getMethodName(); @@ -92,9 +94,17 @@ public class TestDefaultMemStore { @Before public void setUp() throws Exception { internalSetUp(); + // no pool + this.chunkCreator = + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); this.memstore = new DefaultMemStore(); } + @AfterClass + public static void tearDownClass() throws Exception { + ChunkCreator.getInstance().clearChunkIds(); + } + protected void internalSetUp() throws Exception { this.mvcc = new MultiVersionConcurrencyControl(); } @@ -129,7 +139,9 @@ public class TestDefaultMemStore { assertEquals(Segment.getCellLength(kv), sizeChangeForSecondCell.getDataSize()); // make sure chunk size increased even when writing the same cell, if using MSLAB if (msLab instanceof MemStoreLABImpl) { - assertEquals(2 * Segment.getCellLength(kv), + // since we add the chunkID at the 0th offset of the chunk and the + // chunkid is a long we need to account for those 8 bytes + assertEquals(2 * Segment.getCellLength(kv) + Bytes.SIZEOF_LONG, ((MemStoreLABImpl) msLab).getCurrentChunk().getNextFreeOffset()); } } else { http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java index 73fb9cf..24e850d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java @@ -266,6 +266,7 @@ public class TestFailedAppendAndSync { */ public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal) throws IOException { + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java index b416c7d..0f24a24 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java @@ -153,7 +153,7 @@ public class TestHMobStore { htd.addFamily(hcd); HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); - + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, basedir); final WALFactory wals = new WALFactory(walConf, null, methodName);