Avoid stalling Paxos when the paxos state expires

This commit does 2 things:
- It ignores MRCs that are old enough to have expired in some nodes paxos tables
- It ensures the same timestamp is used when reading the paxos state and 
ignoring old MRC

patch by slebresne; reviewed by jasobraown for CASSANDRA-12043


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/017ec3e9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/017ec3e9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/017ec3e9

Branch: refs/heads/cassandra-2.2
Commit: 017ec3e99e704db5e1a36ad153af08d6e7eca523
Parents: 2811f15
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Wed Jun 22 12:12:37 2016 +0200
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Tue Jun 28 15:16:00 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/cql3/QueryProcessor.java   | 28 +++++++++++++++++++-
 .../cql3/statements/SelectStatement.java        |  6 ++++-
 .../org/apache/cassandra/db/SystemKeyspace.java |  6 ++---
 .../apache/cassandra/service/StorageProxy.java  |  2 +-
 .../cassandra/service/paxos/PaxosState.java     | 11 ++++++--
 .../service/paxos/PrepareCallback.java          | 18 ++++++++++++-
 7 files changed, 63 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5741241..feeaded 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.15
+ * Avoid stalling paxos when the paxos state expires (CASSANDRA-12043)
  * Remove finished incoming streaming connections from MessagingService 
(CASSANDRA-11854)
  * Don't try to get sstables for non-repairing column families 
(CASSANDRA-12077)
  * Prevent select statements with clustering key > 64k (CASSANDRA-11882)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java 
b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index d4ca76f..4340d42 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -296,7 +296,7 @@ public class QueryProcessor implements QueryHandler
         return QueryOptions.forInternalCalls(boundValues);
     }
 
-    private static ParsedStatement.Prepared prepareInternal(String query) 
throws RequestValidationException
+    public static ParsedStatement.Prepared prepareInternal(String query) 
throws RequestValidationException
     {
         ParsedStatement.Prepared prepared = internalStatements.get(query);
         if (prepared != null)
@@ -374,6 +374,32 @@ public class QueryProcessor implements QueryHandler
         }
     }
 
+    /**
+     * A special version of executeInternal that takes the time used as "now" 
for the query in argument.
+     * Note that this only make sense for Selects so this only accept SELECT 
statements and is only useful in rare
+     * cases.
+     */
+    public static UntypedResultSet executeInternalWithNow(long now, String 
query, Object... values)
+    {
+        try
+        {
+            ParsedStatement.Prepared prepared = prepareInternal(query);
+            assert prepared.statement instanceof SelectStatement;
+            SelectStatement select = (SelectStatement)prepared.statement;
+            ResultMessage result = 
select.executeInternal(internalQueryState(), makeInternalOptions(prepared, 
values), now);
+            assert result instanceof ResultMessage.Rows;
+            return 
UntypedResultSet.create(((ResultMessage.Rows)result).result);
+        }
+        catch (RequestExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (RequestValidationException e)
+        {
+            throw new RuntimeException("Error validating query " + query, e);
+        }
+    }
+
     public static UntypedResultSet resultify(String query, Row row)
     {
         return resultify(query, Collections.singletonList(row));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 1e142e0..6351bb5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -312,8 +312,12 @@ public class SelectStatement implements CQLStatement
 
     public ResultMessage.Rows executeInternal(QueryState state, QueryOptions 
options) throws RequestExecutionException, RequestValidationException
     {
+        return executeInternal(state, options, System.currentTimeMillis());
+    }
+
+    public ResultMessage.Rows executeInternal(QueryState state, QueryOptions 
options, long now) throws RequestExecutionException, RequestValidationException
+    {
         int limit = getLimit(options);
-        long now = System.currentTimeMillis();
         Pageable command = getPageableCommand(options, limit, now);
 
         int pageSize = options.getPageSize();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 1f66b1b..f8cf1ab 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -896,10 +896,10 @@ public class SystemKeyspace
         return new Row(key, cf);
     }
 
-    public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData 
metadata)
+    public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData 
metadata, long now)
     {
         String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?";
-        UntypedResultSet results = executeInternal(String.format(req, 
PAXOS_CF), key, metadata.cfId);
+        UntypedResultSet results = QueryProcessor.executeInternalWithNow(now, 
String.format(req, PAXOS_CF), key, metadata.cfId);
         if (results.isEmpty())
             return new PaxosState(key, metadata);
         UntypedResultSet.Row row = results.one();
@@ -939,7 +939,7 @@ public class SystemKeyspace
                         proposal.update.id());
     }
 
-    private static int paxosTtl(CFMetaData metadata)
+    public static int paxosTtl(CFMetaData metadata)
     {
         // keep paxos state around for at least 3h
         return Math.max(3 * 3600, metadata.getGcGraceSeconds());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index af0693b..cddc7e9 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -421,7 +421,7 @@ public class StorageProxy implements StorageProxyMBean
             // 
https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810)
             // Since we waited for quorum nodes, if some of them haven't seen 
the last commit (which may just be a timing issue, but may also
             // mean we lost messages), we pro-actively "repair" those nodes, 
and retry.
-            Iterable<InetAddress> missingMRC = 
summary.replicasMissingMostRecentCommit();
+            Iterable<InetAddress> missingMRC = 
summary.replicasMissingMostRecentCommit(metadata, ballotMicros);
             if (Iterables.size(missingMRC) > 0)
             {
                 Tracing.trace("Repairing replicas that missed the most recent 
commit");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java 
b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index 01e03f4..fde881b 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -63,7 +63,13 @@ public class PaxosState
             lock.lock();
             try
             {
-                PaxosState state = 
SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata());
+                // When preparing, we need to use the same time as "now" 
(that's the time we use to decide if something
+                // is expired or not) accross nodes otherwise we may have a 
window where a Most Recent Commit shows up
+                // on some replica and not others during a new proposal (in 
StorageProxy.beginAndRepairPaxos()), and no
+                // amount of re-submit will fix this (because the node on 
which the commit has expired will have a
+                // tombstone that hides any re-submit). See CASSANDRA-12043 
for details.
+                long now = UUIDGen.unixTimestamp(toPrepare.ballot);
+                PaxosState state = 
SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata(), now);
                 if (toPrepare.isAfter(state.promised))
                 {
                     Tracing.trace("Promising ballot {}", toPrepare.ballot);
@@ -98,7 +104,8 @@ public class PaxosState
             lock.lock();
             try
             {
-                PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, 
proposal.update.metadata());
+                long now = UUIDGen.unixTimestamp(proposal.ballot);
+                PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, 
proposal.update.metadata(), now);
                 if (proposal.hasBallot(state.promised.ballot) || 
proposal.isAfter(state.promised))
                 {
                     Tracing.trace("Accepting proposal {}", proposal);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java 
b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index a446b0b..2859a69 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -23,6 +23,7 @@ package org.apache.cassandra.service.paxos;
 
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -33,7 +34,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.utils.UUIDGen;
 
 public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
 {
@@ -86,8 +89,21 @@ public class PrepareCallback extends 
AbstractPaxosCallback<PrepareResponse>
         latch.countDown();
     }
 
-    public Iterable<InetAddress> replicasMissingMostRecentCommit()
+    public Iterable<InetAddress> replicasMissingMostRecentCommit(CFMetaData 
metadata, long now)
     {
+        // In general, we need every replicas that have answered to the 
prepare (a quorum) to agree on the MRC (see
+        // coment in StorageProxy.beginAndRepairPaxos(), but basically we need 
to make sure at least a quorum of nodes
+        // have learn a commit before commit a new one otherwise that previous 
commit is not guaranteed to have reach a
+        // quorum and further commit may proceed on incomplete information).
+        // However, if that commit is too hold, it may have been expired from 
some of the replicas paxos table (we don't
+        // keep the paxos state forever or that could grow unchecked), and we 
could end up in some infinite loop as
+        // explained on CASSANDRA-12043. To avoid that, we ignore a MRC that 
is too old, i.e. older than the TTL we set
+        // on paxos tables. For such old commit, we rely on hints and repair 
to ensure the commit has indeed be
+        // propagated to all nodes.
+        long paxosTtlMicros = SystemKeyspace.paxosTtl(metadata) * 1000 * 1000;
+        if (UUIDGen.microsTimestamp(mostRecentCommit.ballot) + paxosTtlMicros 
< now)
+            return Collections.emptySet();
+
         return Iterables.filter(commitsByReplica.keySet(), new 
Predicate<InetAddress>()
         {
             public boolean apply(InetAddress inetAddress)

Reply via email to