Repository: cassandra Updated Branches: refs/heads/trunk 52d5eb04f -> 1c41a9ac2
bound maximum in-flight commit log replay mutation bytes to 64 megabytes (tunable via cassandra.commitlog_max_outstanding_replay_bytes) Patch by Ariel Weisberg; reviewed by tjake for CASSANDRA-8639 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1c41a9ac Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1c41a9ac Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1c41a9ac Branch: refs/heads/trunk Commit: 1c41a9ac2c147ed111d9d8fba53652707dac7df0 Parents: 52d5eb0 Author: Ariel Weisberg <ariel.weisb...@datastax.com> Authored: Tue Nov 24 15:17:10 2015 -0500 Committer: T Jake Luciani <j...@apache.org> Committed: Fri Dec 4 11:49:41 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 3 +- .../db/commitlog/CommitLogReplayer.java | 131 +++++++++++------- .../cassandra/db/RecoveryManagerTest.java | 137 +++++++++++++++++++ 4 files changed, 222 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c41a9ac/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1607a66..6c46183 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.2 + * bound maximum in-flight commit log replay mutation bytes to 64 megabytes (CASSANDRA-8639) * Normalize all scripts (CASSANDRA-10679) * Make compression ratio much more accurate (CASSANDRA-10225) * Optimize building of Clustering object when only one is created (CASSANDRA-10409) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c41a9ac/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 2a5970d..8830c99 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -18,7 +18,8 @@ using the provided 'sstableupgrade' tool. New features ------------ - + - bound maximum in-flight commit log replay mutation bytes to 64 megabytes + tunable via cassandra.commitlog_max_outstanding_replay_bytes - Support for type casting has been added to the selection clause. Upgrading http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c41a9ac/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 2668bba..5010696 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -29,6 +29,7 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.CRC32; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterables; @@ -54,7 +55,6 @@ import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.util.ChannelProxy; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.io.util.NIODataInputStream; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -65,13 +65,17 @@ import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; public class CommitLogReplayer { + @VisibleForTesting + public static long MAX_OUTSTANDING_REPLAY_BYTES = Long.getLong("cassandra.commitlog_max_outstanding_replay_bytes", 1024 * 1024 * 64); + @VisibleForTesting + public static MutationInitiator mutationInitiator = new MutationInitiator(); static final String IGNORE_REPLAY_ERRORS_PROPERTY = "cassandra.commitlog.ignorereplayerrors"; private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class); private static final int MAX_OUTSTANDING_REPLAY_COUNT = Integer.getInteger("cassandra.commitlog_max_outstanding_replay_count", 1024); private static final int LEGACY_END_OF_SEGMENT_MARKER = 0; private final Set<Keyspace> keyspacesRecovered; - private final List<Future<?>> futures; + private final Queue<Future<Integer>> futures; private final Map<UUID, AtomicInteger> invalidMutations; private final AtomicInteger replayedCount; private final Map<UUID, ReplayPosition> cfPositions; @@ -79,14 +83,74 @@ public class CommitLogReplayer private final CRC32 checksum; private byte[] buffer; private byte[] uncompressedBuffer; + private long pendingMutationBytes = 0; private final ReplayFilter replayFilter; private final CommitLogArchiver archiver; + /* + * Wrapper around initiating mutations read from the log to make it possible + * to spy on initiated mutations for test + */ + @VisibleForTesting + public static class MutationInitiator + { + protected Future<Integer> initiateMutation(final Mutation mutation, + final long segmentId, + final int serializedSize, + final long entryLocation, + final CommitLogReplayer clr) + { + Runnable runnable = new WrappedRunnable() + { + public void runMayThrow() throws IOException + { + if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null) + return; + if (clr.pointInTimeExceeded(mutation)) + return; + + final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); + + // Rebuild the mutation, omitting column families that + // a) the user has requested that we ignore, + // b) have already been flushed, + // or c) are part of a cf that was dropped. + // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead. + Mutation newMutation = null; + for (PartitionUpdate update : clr.replayFilter.filter(mutation)) + { + if (Schema.instance.getCF(update.metadata().cfId) == null) + continue; // dropped + + ReplayPosition rp = clr.cfPositions.get(update.metadata().cfId); + + // replay if current segment is newer than last flushed one or, + // if it is the last known segment, if we are after the replay position + if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position)) + { + if (newMutation == null) + newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key()); + newMutation.add(update); + clr.replayedCount.incrementAndGet(); + } + } + if (newMutation != null) + { + assert !newMutation.isEmpty(); + Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation); + clr.keyspacesRecovered.add(keyspace); + } + } + }; + return StageManager.getStage(Stage.MUTATION).submit(runnable, serializedSize); + } + } + CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, ReplayPosition> cfPositions, ReplayFilter replayFilter) { this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>(); - this.futures = new ArrayList<Future<?>>(); + this.futures = new ArrayDeque<Future<Integer>>(); this.buffer = new byte[4096]; this.uncompressedBuffer = new byte[4096]; this.invalidMutations = new HashMap<UUID, AtomicInteger>(); @@ -163,6 +227,8 @@ public class CommitLogReplayer // flush replayed keyspaces futures.clear(); boolean flushingSystem = false; + + List<Future<?>> futures = new ArrayList<Future<?>>(); for (Keyspace keyspace : keyspacesRecovered) { if (keyspace.getName().equals(SystemKeyspace.NAME)) @@ -176,6 +242,7 @@ public class CommitLogReplayer futures.add(Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceFlush()); FBUtilities.waitOnFutures(futures); + return replayedCount.get(); } @@ -565,53 +632,19 @@ public class CommitLogReplayer if (logger.isTraceEnabled()) logger.trace("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(), "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}"); - Runnable runnable = new WrappedRunnable() - { - public void runMayThrow() throws IOException - { - if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null) - return; - if (pointInTimeExceeded(mutation)) - return; - - final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); - - // Rebuild the mutation, omitting column families that - // a) the user has requested that we ignore, - // b) have already been flushed, - // or c) are part of a cf that was dropped. - // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead. - Mutation newMutation = null; - for (PartitionUpdate update : replayFilter.filter(mutation)) - { - if (Schema.instance.getCF(update.metadata().cfId) == null) - continue; // dropped - - ReplayPosition rp = cfPositions.get(update.metadata().cfId); - - // replay if current segment is newer than last flushed one or, - // if it is the last known segment, if we are after the replay position - if (desc.id > rp.segment || (desc.id == rp.segment && entryLocation > rp.position)) - { - if (newMutation == null) - newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key()); - newMutation.add(update); - replayedCount.incrementAndGet(); - } - } - if (newMutation != null) - { - assert !newMutation.isEmpty(); - Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation); - keyspacesRecovered.add(keyspace); - } - } - }; - futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable)); - if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT) + pendingMutationBytes += size; + futures.offer(mutationInitiator.initiateMutation(mutation, + desc.id, + size, + entryLocation, + this)); + //If there are finished mutations, or too many outstanding bytes/mutations + //drain the futures in the queue + while (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT + || pendingMutationBytes > MAX_OUTSTANDING_REPLAY_BYTES + || (!futures.isEmpty() && futures.peek().isDone())) { - FBUtilities.waitOnFutures(futures); - futures.clear(); + pendingMutationBytes -= FBUtilities.waitOnFuture(futures.poll()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c41a9ac/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java index baf9466..788757c 100644 --- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java +++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java @@ -20,7 +20,12 @@ package org.apache.cassandra.db; import java.io.IOException; import java.util.Date; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,11 +50,71 @@ import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.CommitLogArchiver; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.db.commitlog.CommitLogReplayer; @RunWith(OrderedJUnit4ClassRunner.class) public class RecoveryManagerTest { private static Logger logger = LoggerFactory.getLogger(RecoveryManagerTest.class); + static final Semaphore blocker = new Semaphore(0); + static final Semaphore blocked = new Semaphore(0); + static CommitLogReplayer.MutationInitiator originalInitiator = null; + static final CommitLogReplayer.MutationInitiator mockInitiator = new CommitLogReplayer.MutationInitiator() + { + @Override + protected Future<Integer> initiateMutation(final Mutation mutation, + final long segmentId, + final int serializedSize, + final long entryLocation, + final CommitLogReplayer clr) + { + final Future<Integer> toWrap = super.initiateMutation(mutation, + segmentId, + serializedSize, + entryLocation, + clr); + return new Future<Integer>() + { + + @Override + public boolean cancel(boolean mayInterruptIfRunning) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isCancelled() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDone() + { + return blocker.availablePermits() > 0 && toWrap.isDone(); + } + + @Override + public Integer get() throws InterruptedException, ExecutionException + { + System.out.println("Got blocker once"); + blocked.release(); + blocker.acquire(); + return toWrap.get(); + } + + @Override + public Integer get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException + { + blocked.release(); + blocker.tryAcquire(1, timeout, unit); + return toWrap.get(timeout, unit); + } + + }; + } + }; private static final String KEYSPACE1 = "RecoveryManagerTest1"; private static final String CF_STANDARD1 = "Standard1"; @@ -86,6 +151,78 @@ public class RecoveryManagerTest } @Test + public void testRecoverBlocksOnBytesOutstanding() throws Exception + { + long originalMaxOutstanding = CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES; + CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES = 1; + CommitLogReplayer.MutationInitiator originalInitiator = CommitLogReplayer.mutationInitiator; + CommitLogReplayer.mutationInitiator = mockInitiator; + try + { + CommitLog.instance.resetUnsafe(true); + Keyspace keyspace1 = Keyspace.open(KEYSPACE1); + Keyspace keyspace2 = Keyspace.open(KEYSPACE2); + + UnfilteredRowIterator upd1 = Util.apply(new RowUpdateBuilder(keyspace1.getColumnFamilyStore(CF_STANDARD1).metadata, 1L, 0, "keymulti") + .clustering("col1").add("val", "1") + .build()); + + UnfilteredRowIterator upd2 = Util.apply(new RowUpdateBuilder(keyspace2.getColumnFamilyStore(CF_STANDARD3).metadata, 1L, 0, "keymulti") + .clustering("col2").add("val", "1") + .build()); + + keyspace1.getColumnFamilyStore("Standard1").clearUnsafe(); + keyspace2.getColumnFamilyStore("Standard3").clearUnsafe(); + + DecoratedKey dk = Util.dk("keymulti"); + Assert.assertTrue(Util.getAllUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).isEmpty()); + Assert.assertTrue(Util.getAllUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).isEmpty()); + + final AtomicReference<Throwable> err = new AtomicReference<Throwable>(); + Thread t = new Thread() { + @Override + public void run() + { + try + { + CommitLog.instance.resetUnsafe(false); // disassociate segments from live CL + } + catch (Throwable t) + { + err.set(t); + } + } + }; + t.start(); + Assert.assertTrue(blocked.tryAcquire(1, 20, TimeUnit.SECONDS)); + Thread.sleep(100); + Assert.assertTrue(t.isAlive()); + blocker.release(Integer.MAX_VALUE); + t.join(20 * 1000); + + if (err.get() != null) + throw new RuntimeException(err.get()); + + if (t.isAlive()) + { + Throwable toPrint = new Throwable(); + toPrint.setStackTrace(Thread.getAllStackTraces().get(t)); + toPrint.printStackTrace(System.out); + } + Assert.assertFalse(t.isAlive()); + + Assert.assertTrue(Util.equal(upd1, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).unfilteredIterator())); + Assert.assertTrue(Util.equal(upd2, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).unfilteredIterator())); + } + finally + { + CommitLogReplayer.mutationInitiator = originalInitiator; + CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES = originalMaxOutstanding; + } + } + + + @Test public void testOne() throws IOException { CommitLog.instance.resetUnsafe(true);