Fail repair if participant dies during sync or anticompaction Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12901
This reverts the behavior of repair relying exclusively on TCP keep-alive to detect node failures during sync introduced by CASSANDRA-3569. Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84b9e727 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84b9e727 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84b9e727 Branch: refs/heads/cassandra-3.X Commit: 84b9e72788816b571cb50404ccb9fb9b5f19ba4f Parents: ebf3507 Author: Paulo Motta <pauloricard...@gmail.com> Authored: Mon Nov 14 18:56:16 2016 -0200 Committer: Yuki Morishita <yu...@apache.org> Committed: Wed Nov 16 15:20:46 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/repair/AnticompactionTask.java | 80 ++++++++++++++++++-- .../apache/cassandra/repair/RepairSession.java | 13 ---- .../cassandra/service/ActiveRepairService.java | 30 ++++++-- 4 files changed, 99 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 98c1839..3482052 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.9 + * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901) * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863) * Fix Util.spinAssertEquals (CASSANDRA-12283) * Fix potential NPE for compactionstats (CASSANDRA-12462) http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/AnticompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java index 8ecae23..c5e066d 100644 --- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java +++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java @@ -22,30 +22,43 @@ import java.net.InetAddress; import java.util.Collection; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.util.concurrent.AbstractFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; +import org.apache.cassandra.gms.IFailureDetectionEventListener; +import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.AnticompactionRequest; import org.apache.cassandra.utils.CassandraVersion; -public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable +public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable, IEndpointStateChangeSubscriber, + IFailureDetectionEventListener { /* * Version that anticompaction response is not supported up to. * If Cassandra version is more than this, we need to wait for anticompaction response. */ private static final CassandraVersion VERSION_CHECKER = new CassandraVersion("2.1.5"); + private static Logger logger = LoggerFactory.getLogger(RepairSession.class); private final UUID parentSession; private final InetAddress neighbor; private final Collection<Range<Token>> successfulRanges; + private final AtomicBoolean isFinished = new AtomicBoolean(false); public AnticompactionTask(UUID parentSession, InetAddress neighbor, Collection<Range<Token>> successfulRanges) { @@ -66,21 +79,41 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R } else { - MessagingService.instance().sendOneWay(acr.createMessage(), neighbor); // immediately return after sending request - set(neighbor); + MessagingService.instance().sendOneWay(acr.createMessage(), neighbor); + maybeSetResult(neighbor); } } else { - setException(new IOException(neighbor + " is down")); + maybeSetException(new IOException(neighbor + " is down")); + } + } + + private boolean maybeSetException(Throwable t) + { + if (isFinished.compareAndSet(false, true)) + { + setException(t); + return true; } + return false; + } + + private boolean maybeSetResult(InetAddress o) + { + if (isFinished.compareAndSet(false, true)) + { + set(o); + return true; + } + return false; } /** * Callback for antitcompaction request. Run on INTERNAL_RESPONSE stage. */ - public static class AnticompactionCallback implements IAsyncCallbackWithFailure + public class AnticompactionCallback implements IAsyncCallbackWithFailure { final AnticompactionTask task; @@ -91,7 +124,7 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R public void response(MessageIn msg) { - task.set(msg.from); + maybeSetResult(msg.from); } public boolean isLatencyForSnitch() @@ -101,7 +134,40 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R public void onFailure(InetAddress from) { - task.setException(new RuntimeException("Anticompaction failed or timed out in " + from)); + maybeSetException(new RuntimeException("Anticompaction failed or timed out in " + from)); + } + } + + public void onJoin(InetAddress endpoint, EndpointState epState) {} + public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {} + public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {} + public void onAlive(InetAddress endpoint, EndpointState state) {} + public void onDead(InetAddress endpoint, EndpointState state) {} + + public void onRemove(InetAddress endpoint) + { + convict(endpoint, Double.MAX_VALUE); + } + + public void onRestart(InetAddress endpoint, EndpointState epState) + { + convict(endpoint, Double.MAX_VALUE); + } + + public void convict(InetAddress endpoint, double phi) + { + if (!neighbor.equals(endpoint)) + return; + + // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost. + if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold()) + return; + + Exception exception = new IOException(String.format("Endpoint %s died during anti-compaction.", endpoint)); + if (maybeSetException(exception)) + { + // Though unlikely, it is possible to arrive here multiple time and we want to avoid print an error message twice + logger.error("[repair #{}] Endpoint {} died during anti-compaction", endpoint, parentSession, exception); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index a2dcdd1..70bfaa6 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -23,7 +23,6 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.Lists; import com.google.common.util.concurrent.*; @@ -92,9 +91,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement public final Set<InetAddress> endpoints; private final long repairedAt; - // number of validations left to be performed - private final AtomicInteger validationRemaining; - private final AtomicBoolean isFailed = new AtomicBoolean(false); // Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address) @@ -138,7 +134,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement this.range = range; this.endpoints = endpoints; this.repairedAt = repairedAt; - this.validationRemaining = new AtomicInteger(cfnames.length); } public UUID getId() @@ -181,14 +176,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement logger.info("[repair #{}] {}", getId(), message); Tracing.traceRepair(message); task.treeReceived(tree); - - // Unregister from FailureDetector once we've completed synchronizing Merkle trees. - // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down. - // See CASSANDRA-3569 - if (validationRemaining.decrementAndGet() == 0) - { - FailureDetector.instance.unregisterFailureDetectionEventListener(this); - } } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 38804b3..7d56e4b 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -29,6 +29,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -147,10 +148,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai sessions.put(session.getId(), session); // register listeners - gossiper.register(session); - failureDetector.registerFailureDetectionEventListener(session); + registerOnFdAndGossip(session); - // unregister listeners at completion + // remove session at completion session.addListener(new Runnable() { /** @@ -158,8 +158,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai */ public void run() { - failureDetector.unregisterFailureDetectionEventListener(session); - gossiper.unregister(session); sessions.remove(session.getId()); } }, MoreExecutors.sameThreadExecutor()); @@ -167,6 +165,27 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai return session; } + private <T extends AbstractFuture & + IEndpointStateChangeSubscriber & + IFailureDetectionEventListener> void registerOnFdAndGossip(final T task) + { + gossiper.register(task); + failureDetector.registerFailureDetectionEventListener(task); + + // unregister listeners at completion + task.addListener(new Runnable() + { + /** + * When repair finished, do clean up + */ + public void run() + { + failureDetector.unregisterFailureDetectionEventListener(task); + gossiper.unregister(task); + } + }, MoreExecutors.sameThreadExecutor()); + } + public synchronized void terminateSessions() { Throwable cause = new IOException("Terminate session is called"); @@ -362,6 +381,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai for (InetAddress neighbor : neighbors) { AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges); + registerOnFdAndGossip(task); tasks.add(task); task.run(); // 'run' is just sending message }