Merge commit '6555a87bde4daeb8bd5d9558595a367ec6bc061d' into cassandra-3.0 * commit '6555a87bde4daeb8bd5d9558595a367ec6bc061d': Avoid stalling Paxos when the paxos state expires
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/70059726 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/70059726 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/70059726 Branch: refs/heads/cassandra-3.0 Commit: 70059726f08a98ea21af91ce3855bf62f6f4b652 Parents: cb4540e 6555a87 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Tue Jun 28 15:19:08 2016 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Tue Jun 28 15:19:57 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/cql3/QueryProcessor.java | 17 ++++++++++++++++- .../cql3/statements/SelectStatement.java | 6 +++++- .../org/apache/cassandra/db/SystemKeyspace.java | 12 ++++++------ .../apache/cassandra/service/StorageProxy.java | 4 +++- .../cassandra/service/paxos/PaxosState.java | 11 +++++++++-- .../cassandra/service/paxos/PrepareCallback.java | 18 +++++++++++++++++- src/java/org/apache/cassandra/utils/UUIDGen.java | 11 +++++++++++ 8 files changed, 68 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 314a93e,9f42d98..aaeafd6 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -9,38 -2,9 +9,39 @@@ Merged from 2.2 * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755) * Validate bloom_filter_fp_chance against lowest supported value when the table is created (CASSANDRA-11920) - * RandomAccessReader: call isEOF() only when rebuffering, not for every read operation (CASSANDRA-12013) * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038) * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984) +Merged from 2.1: ++ * Avoid stalling paxos when the paxos state expires (CASSANDRA-12043) + * Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854) + * Don't try to get sstables for non-repairing column families (CASSANDRA-12077) + * Avoid marking too many sstables as repaired (CASSANDRA-11696) + * Prevent select statements with clustering key > 64k (CASSANDRA-11882) + * Fix clock skew corrupting other nodes with paxos (CASSANDRA-11991) + * Remove distinction between non-existing static columns and existing but null in LWTs (CASSANDRA-9842) + * Cache local ranges when calculating repair neighbors (CASSANDRA-11934) + * Allow LWT operation on static column with only partition keys (CASSANDRA-10532) + * Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886) + * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749) + + +3.0.7 + * Fix legacy serialization of Thrift-generated non-compound range tombstones + when communicating with 2.x nodes (CASSANDRA-11930) + * Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849) + * Avoid referencing DatabaseDescriptor in AbstractType (CASSANDRA-11912) + * Fix sstables not being protected from removal during index build (CASSANDRA-11905) + * cqlsh: Suppress stack trace from Read/WriteFailures (CASSANDRA-11032) + * Remove unneeded code to repair index summaries that have + been improperly down-sampled (CASSANDRA-11127) + * Avoid WriteTimeoutExceptions during commit log replay due to materialized + view lock contention (CASSANDRA-11891) + * Prevent OOM failures on SSTable corruption, improve tests for corruption detection (CASSANDRA-9530) + * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705) + * Allow compaction strategies to disable early open (CASSANDRA-11754) + * Refactor Materialized View code (CASSANDRA-11475) + * Update Java Driver (CASSANDRA-11615) +Merged from 2.2: * Persist local metadata earlier in startup sequence (CASSANDRA-11742) * Run CommitLog tests with different compression settings (CASSANDRA-9039) * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664) http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java index da146ef,c702679..af94d3e --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@@ -273,10 -288,10 +273,10 @@@ public class QueryProcessor implements AbstractType type = prepared.boundNames.get(i).type; boundValues.add(value instanceof ByteBuffer || value == null ? (ByteBuffer)value : type.decompose(value)); } - return QueryOptions.forInternalCalls(boundValues); + return QueryOptions.forInternalCalls(cl, boundValues); } - private static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException + public static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException { ParsedStatement.Prepared prepared = internalStatements.get(query); if (prepared != null) @@@ -343,19 -340,42 +343,34 @@@ return null; } + /** + * A special version of executeInternal that takes the time used as "now" for the query in argument. + * Note that this only make sense for Selects so this only accept SELECT statements and is only useful in rare + * cases. + */ - public static UntypedResultSet executeInternalWithNow(long now, String query, Object... values) ++ public static UntypedResultSet executeInternalWithNow(int nowInSec, String query, Object... values) + { - try - { - ParsedStatement.Prepared prepared = prepareInternal(query); - assert prepared.statement instanceof SelectStatement; - SelectStatement select = (SelectStatement)prepared.statement; - ResultMessage result = select.executeInternal(internalQueryState(), makeInternalOptions(prepared, values), now); - assert result instanceof ResultMessage.Rows; - return UntypedResultSet.create(((ResultMessage.Rows)result).result); - } - catch (RequestExecutionException e) - { - throw new RuntimeException(e); - } - catch (RequestValidationException e) - { - throw new RuntimeException("Error validating query " + query, e); - } ++ ParsedStatement.Prepared prepared = prepareInternal(query); ++ assert prepared.statement instanceof SelectStatement; ++ SelectStatement select = (SelectStatement)prepared.statement; ++ ResultMessage result = select.executeInternal(internalQueryState(), makeInternalOptions(prepared, values), nowInSec); ++ assert result instanceof ResultMessage.Rows; ++ return UntypedResultSet.create(((ResultMessage.Rows)result).result); + } + - public static UntypedResultSet resultify(String query, Row row) + public static UntypedResultSet resultify(String query, RowIterator partition) { - return resultify(query, Collections.singletonList(row)); + return resultify(query, PartitionIterators.singletonIterator(partition)); } - public static UntypedResultSet resultify(String query, List<Row> rows) + public static UntypedResultSet resultify(String query, PartitionIterator partitions) { - SelectStatement ss = (SelectStatement) getStatement(query, null).statement; - ResultSet cqlRows = ss.process(rows); - return UntypedResultSet.create(cqlRows); + try (PartitionIterator iter = partitions) + { + SelectStatement ss = (SelectStatement) getStatement(query, null).statement; + ResultSet cqlRows = ss.process(iter, FBUtilities.nowInSeconds()); + return UntypedResultSet.create(cqlRows); + } } public ResultMessage.Prepared prepare(String query, http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 0e33475,8820ff7..aca6146 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@@ -401,33 -280,45 +401,37 @@@ public class SelectStatement implement return new ResultMessage.Rows(rset); } - static List<Row> readLocally(String keyspaceName, List<ReadCommand> cmds) - { - Keyspace keyspace = Keyspace.open(keyspaceName); - List<Row> rows = new ArrayList<Row>(cmds.size()); - for (ReadCommand cmd : cmds) - rows.add(cmd.getRow(keyspace)); - return rows; - } - public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException { - int nowInSec = FBUtilities.nowInSeconds(); - return executeInternal(state, options, System.currentTimeMillis()); ++ return executeInternal(state, options, FBUtilities.nowInSeconds()); + } + - public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, long now) throws RequestExecutionException, RequestValidationException ++ public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, int nowInSec) throws RequestExecutionException, RequestValidationException + { - int limit = getLimit(options); - Pageable command = getPageableCommand(options, limit, now); + int userLimit = getLimit(options); + ReadQuery query = getQuery(options, nowInSec, userLimit); int pageSize = getPageSize(options); - if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize)) + try (ReadOrderGroup orderGroup = query.startOrderGroup()) { - List<Row> rows = command == null - ? Collections.<Row>emptyList() - : (command instanceof Pageable.ReadCommands - ? readLocally(keyspace(), ((Pageable.ReadCommands)command).commands) - : ((RangeSliceCommand)command).executeLocally()); - - return processResults(rows, options, limit, now); + if (pageSize <= 0 || query.limits().count() <= pageSize) + { + try (PartitionIterator data = query.executeInternal(orderGroup)) + { + return processResults(data, options, nowInSec, userLimit); + } + } + else + { + QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion()); + return execute(Pager.forInternalQuery(pager, orderGroup), options, pageSize, nowInSec, userLimit); + } } - - QueryPager pager = QueryPagers.localPager(command); - return execute(pager, options, limit, now, pageSize); } - public ResultSet process(List<Row> rows) throws InvalidRequestException + public ResultSet process(PartitionIterator partitions, int nowInSec) throws InvalidRequestException { - QueryOptions options = QueryOptions.DEFAULT; - return process(rows, options, getLimit(options), System.currentTimeMillis()); + return process(partitions, QueryOptions.DEFAULT, nowInSec, getLimit(QueryOptions.DEFAULT)); } public String keyspace() http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java index c5c6abe,e0d5f66..da96b38 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@@ -1082,10 -894,10 +1082,10 @@@ public final class SystemKeyspac return null; } - public static PaxosState loadPaxosState(DecoratedKey key, CFMetaData metadata) - public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata, long now) ++ public static PaxosState loadPaxosState(DecoratedKey key, CFMetaData metadata, int nowInSec) { String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?"; - UntypedResultSet results = executeInternal(String.format(req, PAXOS), key.getKey(), metadata.cfId); - UntypedResultSet results = QueryProcessor.executeInternalWithNow(now, String.format(req, PAXOS), key, metadata.cfId); ++ UntypedResultSet results = QueryProcessor.executeInternalWithNow(nowInSec, String.format(req, PAXOS), key.getKey(), metadata.cfId); if (results.isEmpty()) return new PaxosState(key, metadata); UntypedResultSet.Row row = results.one(); @@@ -1110,43 -920,41 +1110,43 @@@ String req = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?"; executeInternal(String.format(req, PAXOS), UUIDGen.microsTimestamp(promise.ballot), - paxosTtl(promise.update.metadata()), - paxosTtl(promise.update.metadata), ++ paxosTtlSec(promise.update.metadata()), promise.ballot, - promise.key, - promise.update.id()); + promise.update.partitionKey().getKey(), + promise.update.metadata().cfId); } public static void savePaxosProposal(Commit proposal) { - executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS), + executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ?, proposal_version = ? WHERE row_key = ? AND cf_id = ?", PAXOS), UUIDGen.microsTimestamp(proposal.ballot), - paxosTtl(proposal.update.metadata()), - paxosTtl(proposal.update.metadata), ++ paxosTtlSec(proposal.update.metadata()), proposal.ballot, - proposal.update.toBytes(), - proposal.key, - proposal.update.id()); + PartitionUpdate.toBytes(proposal.update, MessagingService.current_version), + MessagingService.current_version, + proposal.update.partitionKey().getKey(), + proposal.update.metadata().cfId); } - private static int paxosTtl(CFMetaData metadata) - public static int paxosTtl(CFMetaData metadata) ++ public static int paxosTtlSec(CFMetaData metadata) { // keep paxos state around for at least 3h - return Math.max(3 * 3600, metadata.getGcGraceSeconds()); + return Math.max(3 * 3600, metadata.params.gcGraceSeconds); } public static void savePaxosCommit(Commit commit) { // We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old) // even though that's really just an optimization since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc. - String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ? WHERE row_key = ? AND cf_id = ?"; + String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = ? WHERE row_key = ? AND cf_id = ?"; executeInternal(String.format(cql, PAXOS), UUIDGen.microsTimestamp(commit.ballot), - paxosTtl(commit.update.metadata()), - paxosTtl(commit.update.metadata), ++ paxosTtlSec(commit.update.metadata()), commit.ballot, - commit.update.toBytes(), - commit.key, - commit.update.id()); + PartitionUpdate.toBytes(commit.update, MessagingService.current_version), + MessagingService.current_version, + commit.update.partitionKey().getKey(), + commit.update.metadata().cfId); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index 34d7c40,03dd209..483da67 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -31,6 -30,6 +31,7 @@@ import javax.management.ObjectName import com.google.common.base.Predicate; import com.google.common.cache.CacheLoader; import com.google.common.collect.*; ++import com.google.common.primitives.Ints; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@@ -439,7 -425,7 +440,8 @@@ public class StorageProxy implements St // https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810) // Since we waited for quorum nodes, if some of them haven't seen the last commit (which may just be a timing issue, but may also // mean we lost messages), we pro-actively "repair" those nodes, and retry. - Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit(); - Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit(metadata, ballotMicros); ++ int nowInSec = Ints.checkedCast(TimeUnit.MICROSECONDS.toSeconds(ballotMicros)); ++ Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit(metadata, nowInSec); if (Iterables.size(missingMRC) > 0) { Tracing.trace("Repairing replicas that missed the most recent commit"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/service/paxos/PaxosState.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/paxos/PaxosState.java index 0b3af8f,fde881b..e01f568 --- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java @@@ -65,7 -63,13 +65,13 @@@ public class PaxosStat lock.lock(); try { - PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.update.partitionKey(), toPrepare.update.metadata()); + // When preparing, we need to use the same time as "now" (that's the time we use to decide if something + // is expired or not) accross nodes otherwise we may have a window where a Most Recent Commit shows up + // on some replica and not others during a new proposal (in StorageProxy.beginAndRepairPaxos()), and no + // amount of re-submit will fix this (because the node on which the commit has expired will have a + // tombstone that hides any re-submit). See CASSANDRA-12043 for details. - long now = UUIDGen.unixTimestamp(toPrepare.ballot); - PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata(), now); ++ int nowInSec = UUIDGen.unixTimestampInSec(toPrepare.ballot); ++ PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.update.partitionKey(), toPrepare.update.metadata(), nowInSec); if (toPrepare.isAfter(state.promised)) { Tracing.trace("Promising ballot {}", toPrepare.ballot); @@@ -100,7 -104,8 +106,8 @@@ lock.lock(); try { - PaxosState state = SystemKeyspace.loadPaxosState(proposal.update.partitionKey(), proposal.update.metadata()); - long now = UUIDGen.unixTimestamp(proposal.ballot); - PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata(), now); ++ int nowInSec = UUIDGen.unixTimestampInSec(proposal.ballot); ++ PaxosState state = SystemKeyspace.loadPaxosState(proposal.update.partitionKey(), proposal.update.metadata(), nowInSec); if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised)) { Tracing.trace("Accepting proposal {}", proposal); http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/paxos/PrepareCallback.java index 9c54b01,081f457..ff81803 --- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java +++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java @@@ -87,8 -89,21 +90,21 @@@ public class PrepareCallback extends Ab latch.countDown(); } - public Iterable<InetAddress> replicasMissingMostRecentCommit() - public Iterable<InetAddress> replicasMissingMostRecentCommit(CFMetaData metadata, long now) ++ public Iterable<InetAddress> replicasMissingMostRecentCommit(CFMetaData metadata, int nowInSec) { + // In general, we need every replicas that have answered to the prepare (a quorum) to agree on the MRC (see + // coment in StorageProxy.beginAndRepairPaxos(), but basically we need to make sure at least a quorum of nodes + // have learn a commit before commit a new one otherwise that previous commit is not guaranteed to have reach a + // quorum and further commit may proceed on incomplete information). + // However, if that commit is too hold, it may have been expired from some of the replicas paxos table (we don't + // keep the paxos state forever or that could grow unchecked), and we could end up in some infinite loop as + // explained on CASSANDRA-12043. To avoid that, we ignore a MRC that is too old, i.e. older than the TTL we set + // on paxos tables. For such old commit, we rely on hints and repair to ensure the commit has indeed be + // propagated to all nodes. - long paxosTtlMicros = SystemKeyspace.paxosTtl(metadata) * 1000 * 1000; - if (UUIDGen.microsTimestamp(mostRecentCommit.ballot) + paxosTtlMicros < now) ++ long paxosTtlSec = SystemKeyspace.paxosTtlSec(metadata); ++ if (UUIDGen.unixTimestampInSec(mostRecentCommit.ballot) + paxosTtlSec < nowInSec) + return Collections.emptySet(); + return Iterables.filter(commitsByReplica.keySet(), new Predicate<InetAddress>() { public boolean apply(InetAddress inetAddress) http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/utils/UUIDGen.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/utils/UUIDGen.java index a673f05,78b8b57..3efcb5e --- a/src/java/org/apache/cassandra/utils/UUIDGen.java +++ b/src/java/org/apache/cassandra/utils/UUIDGen.java @@@ -25,9 -25,9 +25,11 @@@ import java.security.SecureRandom import java.util.Collection; import java.util.Random; import java.util.UUID; ++import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; ++import com.google.common.primitives.Ints; /** @@@ -203,6 -203,6 +205,15 @@@ public class UUIDGe /** * @param uuid ++ * @return seconds since Unix epoch ++ */ ++ public static int unixTimestampInSec(UUID uuid) ++ { ++ return Ints.checkedCast(TimeUnit.MILLISECONDS.toSeconds(unixTimestamp(uuid))); ++ } ++ ++ /** ++ * @param uuid * @return microseconds since Unix epoch */ public static long microsTimestamp(UUID uuid)