Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.5 e36d2333c -> 4651ac735


Add backpressure to compressed commit log

patch by Ariel Weisberg; reviewed by Benjamin Lerer for CASSANDRA-10971


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9995521f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9995521f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9995521f

Branch: refs/heads/cassandra-3.5
Commit: 9995521fb9b3f510ee9c7012d75e6970ec7d5fb7
Parents: 0a5e220
Author: Ariel Weisberg <ariel.weisb...@datastax.com>
Authored: Wed Mar 16 18:14:52 2016 +0100
Committer: Benjamin Lerer <b.le...@gmail.com>
Committed: Wed Mar 16 18:14:52 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/commitlog/AbstractCommitLogService.java  |   1 +
 .../db/commitlog/CommitLogSegment.java          |  19 +++-
 .../db/commitlog/CommitLogSegmentManager.java   |  11 +-
 .../db/commitlog/CompressedSegment.java         |  39 ++++++--
 .../commitlog/CommitLogSegmentManagerTest.java  | 100 +++++++++++++++++++
 6 files changed, 159 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9995521f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 28de247..b264609 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.5
+ * Add backpressure to compressed commit log (CASSANDRA-10971)
  * SSTableExport supports secondary index tables (CASSANDRA-11330)
  * Fix sstabledump to include missing info in debug output (CASSANDRA-11321)
  * Establish and implement canonical bulk reading workload(s) (CASSANDRA-10331)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9995521f/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java 
b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 557bf50..113d1ba 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -89,6 +89,7 @@ public abstract class AbstractCommitLogService
 
                         // sync and signal
                         long syncStarted = System.currentTimeMillis();
+                        //This is a target for Byteman in 
CommitLogSegmentManagerTest
                         commitLog.sync(shutdown);
                         lastSyncedAt = syncStarted;
                         syncComplete.signalAll();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9995521f/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 5dd7c9f..0e9f502 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -117,9 +117,20 @@ public abstract class CommitLogSegment
     final CommitLog commitLog;
     public final CommitLogDescriptor descriptor;
 
-    static CommitLogSegment createSegment(CommitLog commitLog)
+    static CommitLogSegment createSegment(CommitLog commitLog, Runnable 
onClose)
     {
-        return commitLog.compressor != null ? new CompressedSegment(commitLog) 
: new MemoryMappedSegment(commitLog);
+        return commitLog.compressor != null ? new CompressedSegment(commitLog, 
onClose) : new MemoryMappedSegment(commitLog);
+    }
+
+    /**
+     * Checks if the segments use a buffer pool.
+     *
+     * @param commitLog the commit log
+     * @return <code>true</code> if the segments use a buffer pool, 
<code>false</code> otherwise.
+     */
+    static boolean usesBufferPool(CommitLog commitLog)
+    {
+        return commitLog.compressor != null;
     }
 
     static long getNextId()
@@ -148,7 +159,7 @@ public abstract class CommitLogSegment
         {
             throw new FSWriteError(e, logFile);
         }
-        
+
         buffer = createBuffer(commitLog);
         // write the header
         CommitLogDescriptor.writeHeader(buffer, descriptor);
@@ -255,7 +266,7 @@ public abstract class CommitLogSegment
         // Note: Even if the very first allocation of this sync section 
failed, we still want to enter this
         // to ensure the segment is closed. As allocatePosition is set to 1 
beyond the capacity of the buffer,
         // this will always be entered when a mutation allocation has been 
attempted after the marker allocation
-        // succeeded in the previous sync. 
+        // succeeded in the previous sync.
         assert buffer != null;  // Only close once.
 
         int startMarker = lastSyncedOffset;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9995521f/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 564652f..8a8d0e7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -114,11 +114,11 @@ public class CommitLogSegmentManager
                         if (task == null)
                         {
                             // if we have no more work to do, check if we 
should create a new segment
-                            if (availableSegments.isEmpty() && 
(activeSegments.isEmpty() || createReserveSegments))
+                            if (!atSegmentLimit() && 
availableSegments.isEmpty() && (activeSegments.isEmpty() || 
createReserveSegments))
                             {
                                 logger.trace("No segments in reserve; creating 
a fresh one");
                                 // TODO : some error handling in case we fail 
to create a new segment
-                                
availableSegments.add(CommitLogSegment.createSegment(commitLog));
+                                
availableSegments.add(CommitLogSegment.createSegment(commitLog, () -> 
wakeManager()));
                                 hasAvailableSegments.signalAll();
                             }
 
@@ -163,6 +163,12 @@ public class CommitLogSegmentManager
                     }
                 }
             }
+
+            private boolean atSegmentLimit()
+            {
+                return CommitLogSegment.usesBufferPool(commitLog) && 
CompressedSegment.hasReachedPoolLimit();
+            }
+
         };
 
         run = true;
@@ -553,5 +559,6 @@ public class CommitLogSegmentManager
     {
         return Collections.unmodifiableCollection(activeSegments);
     }
+
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9995521f/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java 
b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index aa12e1d..0ec0bca 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSWriteError;
@@ -44,6 +45,12 @@ public class CompressedSegment extends CommitLogSegment
     static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>();
 
     /**
+     * The number of buffers in use
+     */
+    private static AtomicInteger usedBuffers = new AtomicInteger(0);
+
+
+    /**
      * Maximum number of buffers in the compression pool. The default value is 
3, it should not be set lower than that
      * (one segment in compression, one written to, one in reserve); delays in 
compression may cause the log to use
      * more, depending on how soon the sync policy stops all writing threads.
@@ -52,16 +59,18 @@ public class CompressedSegment extends CommitLogSegment
 
     static final int COMPRESSED_MARKER_SIZE = SYNC_MARKER_SIZE + 4;
     final ICompressor compressor;
+    final Runnable onClose;
 
     volatile long lastWrittenPos = 0;
 
     /**
      * Constructs a new segment file.
      */
-    CompressedSegment(CommitLog commitLog)
+    CompressedSegment(CommitLog commitLog, Runnable onClose)
     {
         super(commitLog);
         this.compressor = commitLog.compressor;
+        this.onClose = onClose;
         try
         {
             channel.write((ByteBuffer) buffer.duplicate().flip());
@@ -80,6 +89,7 @@ public class CompressedSegment extends CommitLogSegment
 
     ByteBuffer createBuffer(CommitLog commitLog)
     {
+        usedBuffers.incrementAndGet();
         ByteBuffer buf = bufferPool.poll();
         if (buf == null)
         {
@@ -138,12 +148,29 @@ public class CompressedSegment extends CommitLogSegment
     @Override
     protected void internalClose()
     {
-        if (bufferPool.size() < MAX_BUFFERPOOL_SIZE)
-            bufferPool.add(buffer);
-        else
-            FileUtils.clean(buffer);
+        usedBuffers.decrementAndGet();
+        try {
+            if (bufferPool.size() < MAX_BUFFERPOOL_SIZE)
+                bufferPool.add(buffer);
+            else
+                FileUtils.clean(buffer);
+            super.internalClose();
+        }
+        finally
+        {
+            onClose.run();
+        }
+    }
 
-        super.internalClose();
+    /**
+     * Checks if the number of buffers in use is greater or equals to the 
maximum number of buffers allowed in the pool.
+     *
+     * @return <code>true</code> if the number of buffers in use is greater or 
equals to the maximum number of buffers
+     * allowed in the pool, <code>false</code> otherwise.
+     */
+    static boolean hasReachedPoolLimit()
+    {
+        return usedBuffers.get() >= MAX_BUFFERPOOL_SIZE;
     }
 
     static void shutdown()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9995521f/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java 
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
new file mode 100644
index 0000000..b5c2f41
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
@@ -0,0 +1,100 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+
+import javax.naming.ConfigurationException;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.Config.CommitLogSync;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import com.google.common.collect.ImmutableMap;
+
+@RunWith(BMUnitRunner.class)
+public class CommitLogSegmentManagerTest
+{
+    //Block commit log service from syncing
+    private static final Semaphore allowSync = new Semaphore(0);
+
+    private static final String KEYSPACE1 = "CommitLogTest";
+    private static final String STANDARD1 = "Standard1";
+    private static final String STANDARD2 = "Standard2";
+
+    private final static byte[] entropy = new byte[1024 * 256];
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        new Random().nextBytes(entropy);
+        DatabaseDescriptor.setCommitLogCompression(new 
ParameterizedClass("LZ4Compressor", ImmutableMap.of()));
+        DatabaseDescriptor.setCommitLogSegmentSize(1);
+        DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic);
+        DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000);
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, 
STANDARD1, 0, AsciiType.instance, BytesType.instance),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, 
STANDARD2, 0, AsciiType.instance, BytesType.instance));
+
+        CompactionManager.instance.disableAutoCompaction();
+    }
+
+    @Test
+    @BMRule(name = "Block AbstractCommitLogSegment segment flushing",
+            targetClass = "AbstractCommitLogService$1",
+            targetMethod = "run",
+            targetLocation = "AT INVOKE 
org.apache.cassandra.db.commitlog.CommitLog.sync",
+            action = 
"org.apache.cassandra.db.commitlog.CommitLogSegmentManagerTest.allowSync.acquire()")
+    public void testCompressedCommitLogBackpressure() throws Throwable
+    {
+        CommitLog.instance.resetUnsafe(true);
+        ColumnFamilyStore cfs1 = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+        final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+                     .clustering("bytes")
+                     .add("val", ByteBuffer.wrap(entropy))
+                     .build();
+
+        Thread dummyThread = new Thread( () ->
+        {
+            for (int i = 0; i < 20; i++)
+                CommitLog.instance.add(m);
+        });
+        dummyThread.start();
+
+        CommitLogSegmentManager clsm = CommitLog.instance.allocator;
+
+        //Protect against delay, but still break out as fast as possible
+        long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() - start < 5000)
+        {
+            if (clsm.getActiveSegments().size() >= 3)
+                break;
+        }
+        Thread.sleep(1000);
+
+        //Should only be able to create 3 segments not 7 because it blocks 
waiting for truncation that never comes
+        Assert.assertEquals(3, clsm.getActiveSegments().size());
+
+        clsm.getActiveSegments().forEach( segment -> 
clsm.recycleSegment(segment));
+
+        Util.spinAssertEquals(3, () -> clsm.getActiveSegments().size(), 5);
+    }
+}
\ No newline at end of file

Reply via email to