Repository: cassandra Updated Branches: refs/heads/trunk f31d1a05a -> aed682513
Use consistent nowInSeconds and timestamps values within a request patch by Aleksey Yeschenko; reviewed by Chris Lohfink for CASSANDRA-14671 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aed68251 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aed68251 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aed68251 Branch: refs/heads/trunk Commit: aed682513cc381b80705d1f971fddc394e8a62a5 Parents: f31d1a05 Author: Aleksey Yeshchenko <alek...@apple.com> Authored: Fri Aug 31 11:13:03 2018 +0100 Committer: Aleksey Yeshchenko <alek...@apple.com> Committed: Fri Aug 31 18:29:33 2018 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/cql3/BatchQueryOptions.java | 4 +- .../org/apache/cassandra/cql3/QueryOptions.java | 4 +- .../apache/cassandra/cql3/UpdateParameters.java | 4 +- .../cql3/statements/BatchStatement.java | 64 +++++++----- .../cql3/statements/CQL3CasRequest.java | 43 +++++--- .../cql3/statements/ModificationStatement.java | 101 +++++++++++++------ .../cql3/statements/SelectStatement.java | 7 +- .../cassandra/io/sstable/CQLSSTableWriter.java | 10 +- .../apache/cassandra/service/QueryState.java | 54 +++++++--- .../org/apache/cassandra/cql3/ListsTest.java | 4 +- .../cassandra/transport/SerDeserTest.java | 7 +- .../io/sstable/StressCQLSSTableWriter.java | 9 +- 13 files changed, 206 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 475cd48..e40cf27 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Use consistent nowInSeconds and timestamps values within a request (CASSANDRA-14671) * Add sampler for query time and expose with nodetool (CASSANDRA-14436) * Clean up Message.Request implementations (CASSANDRA-14677) * Disable old native protocol versions on demand (CASANDRA-14659) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java index ac0d148..ac8f179 100644 --- a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java +++ b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java @@ -84,9 +84,9 @@ public abstract class BatchQueryOptions return wrapped.getTimestamp(state); } - public int getNowInSeconds() + public int getNowInSeconds(QueryState state) { - return wrapped.getNowInSeconds(); + return wrapped.getNowInSeconds(state); } private static class WithoutPerStatementVariables extends BatchQueryOptions http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/src/java/org/apache/cassandra/cql3/QueryOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java index e546304..f76d6b2 100644 --- a/src/java/org/apache/cassandra/cql3/QueryOptions.java +++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java @@ -200,10 +200,10 @@ public abstract class QueryOptions return tstamp != Long.MIN_VALUE ? tstamp : state.getTimestamp(); } - public int getNowInSeconds() + public int getNowInSeconds(QueryState state) { int nowInSeconds = getSpecificOptions().nowInSeconds; - return Integer.MIN_VALUE == nowInSeconds ? FBUtilities.nowInSeconds() : nowInSeconds; + return nowInSeconds != Integer.MIN_VALUE ? nowInSeconds : state.getNowInSeconds(); } /** The keyspace that this query is bound to, or null if not relevant. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/src/java/org/apache/cassandra/cql3/UpdateParameters.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java index 500862e..740cd91 100644 --- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java +++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java @@ -28,7 +28,6 @@ import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.utils.FBUtilities; /** * Groups the parameters of an update query, and make building updates easier. @@ -58,6 +57,7 @@ public class UpdateParameters RegularAndStaticColumns updatedColumns, QueryOptions options, long timestamp, + int nowInSec, int ttl, Map<DecoratedKey, Partition> prefetchedRows) throws InvalidRequestException @@ -66,7 +66,7 @@ public class UpdateParameters this.updatedColumns = updatedColumns; this.options = options; - this.nowInSec = options.getNowInSeconds(); + this.nowInSec = nowInSec; this.timestamp = timestamp; this.ttl = ttl; http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 089c532..e925735 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -261,8 +261,11 @@ public class BatchStatement implements CQLStatement return statements; } - private Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now, long queryStartNanoTime) - throws RequestExecutionException, RequestValidationException + private Collection<? extends IMutation> getMutations(BatchQueryOptions options, + boolean local, + long batchTimestamp, + int nowInSeconds, + long queryStartNanoTime) { Set<String> tablesWithZeroGcGs = null; BatchUpdatesCollector collector = new BatchUpdatesCollector(updatedColumns, updatedRows()); @@ -276,8 +279,8 @@ public class BatchStatement implements CQLStatement tablesWithZeroGcGs.add(statement.metadata.toString()); } QueryOptions statementOptions = options.forStatement(i); - long timestamp = attrs.getTimestamp(now, statementOptions); - statement.addUpdates(collector, statementOptions, local, timestamp, queryStartNanoTime); + long timestamp = attrs.getTimestamp(batchTimestamp, statementOptions); + statement.addUpdates(collector, statementOptions, local, timestamp, nowInSeconds, queryStartNanoTime); } if (tablesWithZeroGcGs != null) @@ -372,19 +375,16 @@ public class BatchStatement implements CQLStatement } - public ResultMessage execute(QueryState queryState, QueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException + public ResultMessage execute(QueryState queryState, QueryOptions options, long queryStartNanoTime) { return execute(queryState, BatchQueryOptions.withoutPerStatementVariables(options), queryStartNanoTime); } - public ResultMessage execute(QueryState queryState, BatchQueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException + public ResultMessage execute(QueryState queryState, BatchQueryOptions options, long queryStartNanoTime) { - return execute(queryState, options, false, options.getTimestamp(queryState), queryStartNanoTime); - } + long timestamp = options.getTimestamp(queryState); + int nowInSeconds = options.getNowInSeconds(queryState); - private ResultMessage execute(QueryState queryState, BatchQueryOptions options, boolean local, long now, long queryStartNanoTime) - throws RequestExecutionException, RequestValidationException - { if (options.getConsistency() == null) throw new InvalidRequestException("Invalid empty consistency level"); if (options.getSerialConsistency() == null) @@ -396,7 +396,7 @@ public class BatchStatement implements CQLStatement if (updatesVirtualTables) executeInternalWithoutCondition(queryState, options, queryStartNanoTime); else - executeWithoutConditions(getMutations(options, local, now, queryStartNanoTime), options.getConsistency(), queryStartNanoTime); + executeWithoutConditions(getMutations(options, false, timestamp, nowInSeconds, queryStartNanoTime), options.getConsistency(), queryStartNanoTime); return new ResultMessage.Void(); } @@ -427,7 +427,6 @@ public class BatchStatement implements CQLStatement } private ResultMessage executeWithConditions(BatchQueryOptions options, QueryState state, long queryStartNanoTime) - throws RequestExecutionException, RequestValidationException { Pair<CQL3CasRequest, Set<ColumnMetadata>> p = makeCasRequest(options, state); CQL3CasRequest casRequest = p.left; @@ -443,22 +442,23 @@ public class BatchStatement implements CQLStatement options.getSerialConsistency(), options.getConsistency(), state.getClientState(), - options.getNowInSeconds(), + options.getNowInSeconds(state), queryStartNanoTime)) { - return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, tableName, result, columnsWithConditions, true, + state, options.forStatement(0))); } } private Pair<CQL3CasRequest,Set<ColumnMetadata>> makeCasRequest(BatchQueryOptions options, QueryState state) { - long now = state.getTimestamp(); + long batchTimestamp = options.getTimestamp(state); + int nowInSeconds = options.getNowInSeconds(state); DecoratedKey key = null; CQL3CasRequest casRequest = null; Set<ColumnMetadata> columnsWithConditions = new LinkedHashSet<>(); @@ -467,14 +467,14 @@ public class BatchStatement implements CQLStatement { ModificationStatement statement = statements.get(i); QueryOptions statementOptions = options.forStatement(i); - long timestamp = attrs.getTimestamp(now, statementOptions); + long timestamp = attrs.getTimestamp(batchTimestamp, statementOptions); List<ByteBuffer> pks = statement.buildPartitionKeyNames(statementOptions); if (statement.getRestrictions().keyIsInRelation()) throw new IllegalArgumentException("Batch with conditions cannot span multiple partitions (you cannot use IN on the partition key)"); if (key == null) { key = statement.metadata().partitioner.decorateKey(pks.get(0)); - casRequest = new CQL3CasRequest(statement.metadata(), key, true, conditionColumns, updatesRegularRows, updatesStaticRow); + casRequest = new CQL3CasRequest(statement.metadata(), key, conditionColumns, updatesRegularRows, updatesStaticRow); } else if (!key.getKey().equals(pks.get(0))) { @@ -497,7 +497,7 @@ public class BatchStatement implements CQLStatement for (Slice slice : slices) { - casRequest.addRangeDeletion(slice, statement, statementOptions, timestamp); + casRequest.addRangeDeletion(slice, statement, statementOptions, timestamp, nowInSeconds); } } @@ -513,7 +513,7 @@ public class BatchStatement implements CQLStatement else if (columnsWithConditions != null) Iterables.addAll(columnsWithConditions, statement.getColumnsWithConditions()); } - casRequest.addRowUpdate(clustering, statement, statementOptions, timestamp); + casRequest.addRowUpdate(clustering, statement, statementOptions, timestamp, nowInSeconds); } } @@ -536,14 +536,17 @@ public class BatchStatement implements CQLStatement return new ResultMessage.Void(); } - private ResultMessage executeInternalWithoutCondition(QueryState queryState, BatchQueryOptions batchOptions, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException + private ResultMessage executeInternalWithoutCondition(QueryState queryState, BatchQueryOptions batchOptions, long queryStartNanoTime) { - for (IMutation mutation : getMutations(batchOptions, true, queryState.getTimestamp(), queryStartNanoTime)) + long timestamp = batchOptions.getTimestamp(queryState); + int nowInSeconds = batchOptions.getNowInSeconds(queryState); + + for (IMutation mutation : getMutations(batchOptions, true, timestamp, nowInSeconds, queryStartNanoTime)) mutation.apply(); return null; } - private ResultMessage executeInternalWithConditions(BatchQueryOptions options, QueryState state) throws RequestExecutionException, RequestValidationException + private ResultMessage executeInternalWithConditions(BatchQueryOptions options, QueryState state) { Pair<CQL3CasRequest, Set<ColumnMetadata>> p = makeCasRequest(options, state); CQL3CasRequest request = p.left; @@ -552,9 +555,20 @@ public class BatchStatement implements CQLStatement String ksName = request.metadata.keyspace; String tableName = request.metadata.name; - try (RowIterator result = ModificationStatement.casInternal(request, state, options.getNowInSeconds())) + long timestamp = options.getTimestamp(state); + int nowInSeconds = options.getNowInSeconds(state); + + try (RowIterator result = ModificationStatement.casInternal(request, timestamp, nowInSeconds)) { - return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, tableName, result, columnsWithConditions, true, options.forStatement(0))); + ResultSet resultSet = + ModificationStatement.buildCasResultSet(ksName, + tableName, + result, + columnsWithConditions, + true, + state, + options.forStatement(0)); + return new ResultMessage.Rows(resultSet); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java index 7953c8b..ed985db 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java +++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java @@ -45,7 +45,6 @@ public class CQL3CasRequest implements CASRequest { public final TableMetadata metadata; public final DecoratedKey key; - public final boolean isBatch; private final RegularAndStaticColumns conditionColumns; private final boolean updatesRegularRows; private final boolean updatesStaticRow; @@ -64,7 +63,6 @@ public class CQL3CasRequest implements CASRequest public CQL3CasRequest(TableMetadata metadata, DecoratedKey key, - boolean isBatch, RegularAndStaticColumns conditionColumns, boolean updatesRegularRows, boolean updatesStaticRow) @@ -72,20 +70,19 @@ public class CQL3CasRequest implements CASRequest this.metadata = metadata; this.key = key; this.conditions = new TreeMap<>(metadata.comparator); - this.isBatch = isBatch; this.conditionColumns = conditionColumns; this.updatesRegularRows = updatesRegularRows; this.updatesStaticRow = updatesStaticRow; } - public void addRowUpdate(Clustering clustering, ModificationStatement stmt, QueryOptions options, long timestamp) + void addRowUpdate(Clustering clustering, ModificationStatement stmt, QueryOptions options, long timestamp, int nowInSeconds) { - updates.add(new RowUpdate(clustering, stmt, options, timestamp)); + updates.add(new RowUpdate(clustering, stmt, options, timestamp, nowInSeconds)); } - public void addRangeDeletion(Slice slice, ModificationStatement stmt, QueryOptions options, long timestamp) + void addRangeDeletion(Slice slice, ModificationStatement stmt, QueryOptions options, long timestamp, int nowInSeconds) { - rangeDeletions.add(new RangeDeletion(slice, stmt, options, timestamp)); + rangeDeletions.add(new RangeDeletion(slice, stmt, options, timestamp, nowInSeconds)); } public void addNotExist(Clustering clustering) throws InvalidRequestException @@ -262,19 +259,28 @@ public class CQL3CasRequest implements CASRequest private final ModificationStatement stmt; private final QueryOptions options; private final long timestamp; + private final int nowInSeconds; - private RowUpdate(Clustering clustering, ModificationStatement stmt, QueryOptions options, long timestamp) + private RowUpdate(Clustering clustering, ModificationStatement stmt, QueryOptions options, long timestamp, int nowInSeconds) { this.clustering = clustering; this.stmt = stmt; this.options = options; this.timestamp = timestamp; + this.nowInSeconds = nowInSeconds; } - public void applyUpdates(FilteredPartition current, PartitionUpdate.Builder updateBuilder) throws InvalidRequestException + void applyUpdates(FilteredPartition current, PartitionUpdate.Builder updateBuilder) { Map<DecoratedKey, Partition> map = stmt.requiresRead() ? Collections.singletonMap(key, current) : null; - UpdateParameters params = new UpdateParameters(metadata, updateBuilder.columns(), options, timestamp, stmt.getTimeToLive(options), map); + UpdateParameters params = + new UpdateParameters(metadata, + updateBuilder.columns(), + options, + timestamp, + nowInSeconds, + stmt.getTimeToLive(options), + map); stmt.addUpdateForKey(updateBuilder, clustering, params); } } @@ -285,20 +291,29 @@ public class CQL3CasRequest implements CASRequest private final ModificationStatement stmt; private final QueryOptions options; private final long timestamp; + private final int nowInSeconds; - private RangeDeletion(Slice slice, ModificationStatement stmt, QueryOptions options, long timestamp) + private RangeDeletion(Slice slice, ModificationStatement stmt, QueryOptions options, long timestamp, int nowInSeconds) { this.slice = slice; this.stmt = stmt; this.options = options; this.timestamp = timestamp; + this.nowInSeconds = nowInSeconds; } - public void applyUpdates(FilteredPartition current, PartitionUpdate.Builder updateBuilder) throws InvalidRequestException + void applyUpdates(FilteredPartition current, PartitionUpdate.Builder updateBuilder) { // No slice statements currently require a read, but this maintains consistency with RowUpdate, and future proofs us - Map<DecoratedKey, Partition> map = stmt.requiresRead() ? Collections.<DecoratedKey, Partition>singletonMap(key, current) : null; - UpdateParameters params = new UpdateParameters(metadata, updateBuilder.columns(), options, timestamp, stmt.getTimeToLive(options), map); + Map<DecoratedKey, Partition> map = stmt.requiresRead() ? Collections.singletonMap(key, current) : null; + UpdateParameters params = + new UpdateParameters(metadata, + updateBuilder.columns(), + options, + timestamp, + nowInSeconds, + stmt.getTimeToLive(options), + map); stmt.addUpdateForKey(updateBuilder, slice, params); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 5f3d07f..13fc659 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -465,15 +465,19 @@ public abstract class ModificationStatement implements CQLStatement else cl.validateForWrite(metadata.keyspace); - Collection<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState), queryStartNanoTime); + Collection<? extends IMutation> mutations = + getMutations(options, + false, + options.getTimestamp(queryState), + options.getNowInSeconds(queryState), + queryStartNanoTime); if (!mutations.isEmpty()) StorageProxy.mutateWithTriggers(mutations, cl, false, queryStartNanoTime); return null; } - public ResultMessage executeWithCondition(QueryState queryState, QueryOptions options, long queryStartNanoTime) - throws RequestExecutionException, RequestValidationException + private ResultMessage executeWithCondition(QueryState queryState, QueryOptions options, long queryStartNanoTime) { CQL3CasRequest request = makeCasRequest(queryState, options); @@ -484,10 +488,10 @@ public abstract class ModificationStatement implements CQLStatement options.getSerialConsistency(), options.getConsistency(), queryState.getClientState(), - options.getNowInSeconds(), + options.getNowInSeconds(queryState), queryStartNanoTime)) { - return new ResultMessage.Rows(buildCasResultSet(result, options)); + return new ResultMessage.Rows(buildCasResultSet(result, queryState, options)); } } @@ -500,17 +504,18 @@ public abstract class ModificationStatement implements CQLStatement type.isUpdate()? "updates" : "deletions"); DecoratedKey key = metadata().partitioner.decorateKey(keys.get(0)); - long now = options.getTimestamp(queryState); + long timestamp = options.getTimestamp(queryState); + int nowInSeconds = options.getNowInSeconds(queryState); checkFalse(restrictions.clusteringKeyRestrictionsHasIN(), "IN on the clustering key columns is not supported with conditional %s", type.isUpdate()? "updates" : "deletions"); Clustering clustering = Iterables.getOnlyElement(createClustering(options)); - CQL3CasRequest request = new CQL3CasRequest(metadata(), key, false, conditionColumns(), updatesRegularRows(), updatesStaticRow()); + CQL3CasRequest request = new CQL3CasRequest(metadata(), key, conditionColumns(), updatesRegularRows(), updatesStaticRow()); addConditions(clustering, request, options); - request.addRowUpdate(clustering, this, options, now); + request.addRowUpdate(clustering, this, options, timestamp, nowInSeconds); return request; } @@ -533,13 +538,18 @@ public abstract class ModificationStatement implements CQLStatement return new ColumnSpecification(ksName, cfName, CAS_RESULT_COLUMN, BooleanType.instance); } - private ResultSet buildCasResultSet(RowIterator partition, QueryOptions options) throws InvalidRequestException + private ResultSet buildCasResultSet(RowIterator partition, QueryState state, QueryOptions options) { - return buildCasResultSet(keyspace(), columnFamily(), partition, getColumnsWithConditions(), false, options); + return buildCasResultSet(keyspace(), columnFamily(), partition, getColumnsWithConditions(), false, state, options); } - public static ResultSet buildCasResultSet(String ksName, String tableName, RowIterator partition, Iterable<ColumnMetadata> columnsWithConditions, boolean isBatch, QueryOptions options) - throws InvalidRequestException + static ResultSet buildCasResultSet(String ksName, + String tableName, + RowIterator partition, + Iterable<ColumnMetadata> columnsWithConditions, + boolean isBatch, + QueryState state, + QueryOptions options) { boolean success = partition == null; @@ -547,7 +557,7 @@ public abstract class ModificationStatement implements CQLStatement List<List<ByteBuffer>> rows = Collections.singletonList(Collections.singletonList(BooleanType.instance.decompose(success))); ResultSet rs = new ResultSet(metadata, rows); - return success ? rs : merge(rs, buildCasFailureResultSet(partition, columnsWithConditions, isBatch, options)); + return success ? rs : merge(rs, buildCasFailureResultSet(partition, columnsWithConditions, isBatch, options, options.getNowInSeconds(state))); } private static ResultSet merge(ResultSet left, ResultSet right) @@ -573,8 +583,11 @@ public abstract class ModificationStatement implements CQLStatement return new ResultSet(new ResultSet.ResultMetadata(EMPTY_HASH, specs), rows); } - private static ResultSet buildCasFailureResultSet(RowIterator partition, Iterable<ColumnMetadata> columnsWithConditions, boolean isBatch, QueryOptions options) - throws InvalidRequestException + private static ResultSet buildCasFailureResultSet(RowIterator partition, + Iterable<ColumnMetadata> columnsWithConditions, + boolean isBatch, + QueryOptions options, + int nowInSeconds) { TableMetadata metadata = partition.metadata(); Selection selection; @@ -598,10 +611,8 @@ public abstract class ModificationStatement implements CQLStatement Selectors selectors = selection.newSelectors(options); ResultSetBuilder builder = new ResultSetBuilder(selection.getResultMetadata(), selectors); - SelectStatement.forSelection(metadata, selection).processPartition(partition, - options, - builder, - options.getNowInSeconds()); + SelectStatement.forSelection(metadata, selection) + .processPartition(partition, options, builder, nowInSeconds); return builder.build(); } @@ -613,25 +624,29 @@ public abstract class ModificationStatement implements CQLStatement : executeInternalWithoutCondition(queryState, options, System.nanoTime()); } - public ResultMessage executeInternalWithoutCondition(QueryState queryState, QueryOptions options, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException + public ResultMessage executeInternalWithoutCondition(QueryState queryState, QueryOptions options, long queryStartNanoTime) + throws RequestValidationException, RequestExecutionException { - for (IMutation mutation : getMutations(options, true, queryState.getTimestamp(), queryStartNanoTime)) + long timestamp = options.getTimestamp(queryState); + int nowInSeconds = options.getNowInSeconds(queryState); + for (IMutation mutation : getMutations(options, true, timestamp, nowInSeconds, queryStartNanoTime)) mutation.apply(); return null; } - public ResultMessage executeInternalWithCondition(QueryState state, QueryOptions options) throws RequestValidationException, RequestExecutionException + public ResultMessage executeInternalWithCondition(QueryState state, QueryOptions options) { CQL3CasRequest request = makeCasRequest(state, options); - try (RowIterator result = casInternal(request, state, options.getNowInSeconds())) + + try (RowIterator result = casInternal(request, options.getTimestamp(state), options.getNowInSeconds(state))) { - return new ResultMessage.Rows(buildCasResultSet(result, options)); + return new ResultMessage.Rows(buildCasResultSet(result, state, options)); } } - static RowIterator casInternal(CQL3CasRequest request, QueryState state, int nowInSeconds) + static RowIterator casInternal(CQL3CasRequest request, long timestamp, int nowInSeconds) { - UUID ballot = UUIDGen.getTimeUUIDFromMicros(state.getTimestamp()); + UUID ballot = UUIDGen.getTimeUUIDFromMicros(timestamp); SinglePartitionReadQuery readCommand = request.readCommand(nowInSeconds); FilteredPartition current; @@ -661,10 +676,14 @@ public abstract class ModificationStatement implements CQLStatement * * @return list of the mutations */ - private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long timestamp, long queryStartNanoTime) + private Collection<? extends IMutation> getMutations(QueryOptions options, + boolean local, + long timestamp, + int nowInSeconds, + long queryStartNanoTime) { UpdatesCollector collector = new SingleTableUpdatesCollector(metadata, updatedColumns, 1); - addUpdates(collector, options, local, timestamp, queryStartNanoTime); + addUpdates(collector, options, local, timestamp, nowInSeconds, queryStartNanoTime); return collector.toMutations(); } @@ -672,6 +691,7 @@ public abstract class ModificationStatement implements CQLStatement QueryOptions options, boolean local, long timestamp, + int nowInSeconds, long queryStartNanoTime) { List<ByteBuffer> keys = buildPartitionKeyNames(options); @@ -690,6 +710,7 @@ public abstract class ModificationStatement implements CQLStatement DataLimits.NONE, local, timestamp, + nowInSeconds, queryStartNanoTime); for (ByteBuffer key : keys) { @@ -710,7 +731,7 @@ public abstract class ModificationStatement implements CQLStatement if (restrictions.hasClusteringColumnsRestrictions() && clusterings.isEmpty()) return; - UpdateParameters params = makeUpdateParameters(keys, clusterings, options, local, timestamp, queryStartNanoTime); + UpdateParameters params = makeUpdateParameters(keys, clusterings, options, local, timestamp, nowInSeconds, queryStartNanoTime); for (ByteBuffer key : keys) { @@ -754,6 +775,7 @@ public abstract class ModificationStatement implements CQLStatement QueryOptions options, boolean local, long timestamp, + int nowInSeconds, long queryStartNanoTime) { if (clusterings.contains(Clustering.STATIC_CLUSTERING)) @@ -763,6 +785,7 @@ public abstract class ModificationStatement implements CQLStatement DataLimits.cqlLimits(1), local, timestamp, + nowInSeconds, queryStartNanoTime); return makeUpdateParameters(keys, @@ -771,6 +794,7 @@ public abstract class ModificationStatement implements CQLStatement DataLimits.NONE, local, timestamp, + nowInSeconds, queryStartNanoTime); } @@ -780,11 +804,26 @@ public abstract class ModificationStatement implements CQLStatement DataLimits limits, boolean local, long timestamp, + int nowInSeconds, long queryStartNanoTime) { // Some lists operation requires reading - Map<DecoratedKey, Partition> lists = readRequiredLists(keys, filter, limits, local, options.getConsistency(), options.getNowInSeconds(), queryStartNanoTime); - return new UpdateParameters(metadata(), updatedColumns(), options, getTimestamp(timestamp, options), getTimeToLive(options), lists); + Map<DecoratedKey, Partition> lists = + readRequiredLists(keys, + filter, + limits, + local, + options.getConsistency(), + nowInSeconds, + queryStartNanoTime); + + return new UpdateParameters(metadata(), + updatedColumns(), + options, + getTimestamp(timestamp, options), + nowInSeconds, + getTimeToLive(options), + lists); } private Slices toSlices(SortedSet<ClusteringBound> startBounds, SortedSet<ClusteringBound> endBounds) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 61715b9..f847a6e 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -66,7 +66,6 @@ import org.apache.cassandra.service.pager.QueryPager; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -227,14 +226,14 @@ public class SelectStatement implements CQLStatement // Nothing to do, all validation has been done by RawStatement.prepare() } - public ResultMessage.Rows execute(QueryState state, QueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException + public ResultMessage.Rows execute(QueryState state, QueryOptions options, long queryStartNanoTime) { ConsistencyLevel cl = options.getConsistency(); checkNotNull(cl, "Invalid empty consistency level"); cl.validateForRead(keyspace()); - int nowInSec = options.getNowInSeconds(); + int nowInSec = options.getNowInSeconds(state); int userLimit = getLimit(options); int userPerPartitionLimit = getPerPartitionLimit(options); int pageSize = options.getPageSize(); @@ -428,7 +427,7 @@ public class SelectStatement implements CQLStatement public ResultMessage.Rows executeLocally(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException { - return executeInternal(state, options, options.getNowInSeconds(), System.nanoTime()); + return executeInternal(state, options, options.getNowInSeconds(state), System.nanoTime()); } public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, int nowInSec, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index 0c2cf28..b24f595 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.SortedSet; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import com.datastax.driver.core.ProtocolVersion; @@ -43,10 +44,8 @@ import org.apache.cassandra.cql3.functions.UDHelper; import org.apache.cassandra.cql3.statements.ModificationStatement; import org.apache.cassandra.cql3.statements.UpdateStatement; import org.apache.cassandra.db.Clustering; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.marshal.UserType; -import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.exceptions.InvalidRequestException; @@ -250,15 +249,16 @@ public class CQLSSTableWriter implements Closeable List<ByteBuffer> keys = insert.buildPartitionKeyNames(options); SortedSet<Clustering> clusterings = insert.createClustering(options); - long now = System.currentTimeMillis() * 1000; + long now = System.currentTimeMillis(); // Note that we asks indexes to not validate values (the last 'false' arg below) because that triggers a 'Keyspace.open' // and that forces a lot of initialization that we don't want. UpdateParameters params = new UpdateParameters(insert.metadata, insert.updatedColumns(), options, - insert.getTimestamp(now, options), + insert.getTimestamp(TimeUnit.MILLISECONDS.toMicros(now), options), + (int) TimeUnit.MILLISECONDS.toSeconds(now), insert.getTimeToLive(options), - Collections.<DecoratedKey, Partition>emptyMap()); + Collections.emptyMap()); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/src/java/org/apache/cassandra/service/QueryState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java index b266fb8..d1b03d4 100644 --- a/src/java/org/apache/cassandra/service/QueryState.java +++ b/src/java/org/apache/cassandra/service/QueryState.java @@ -22,7 +22,10 @@ import java.net.InetAddress; import org.apache.cassandra.utils.FBUtilities; /** - * Represents the state related to a given query. + * Primarily used as a recorder for server-generated timestamps (timestamp, in microseconds, and nowInSeconds - in, well, seconds). + * + * The goal is to be able to use a single consistent server-generated value for both timestamps across the whole request, + * and later be able to inspect QueryState for the generated values - for logging or other purposes. */ public class QueryState { @@ -37,6 +40,41 @@ public class QueryState } /** + * Generate, cache, and record a timestamp value on the server-side. + * + * Used in reads for all live and expiring cells, and all kinds of deletion infos. + * + * Shouldn't be used directly. {@link org.apache.cassandra.cql3.QueryOptions#getTimestamp(QueryState)} should be used + * by all consumers. + * + * @return server-generated, recorded timestamp in seconds + */ + public long getTimestamp() + { + if (timestamp == Long.MIN_VALUE) + timestamp = clientState.getTimestamp(); + return timestamp; + } + + /** + * Generate, cache, and record a nowInSeconds value on the server-side. + * + * In writes is used for calculating localDeletionTime for tombstones and expiring cells and other deletion infos. + * In reads used to determine liveness of expiring cells and rows. + * + * Shouldn't be used directly. {@link org.apache.cassandra.cql3.QueryOptions#getNowInSeconds(QueryState)} should be used + * by all consumers. + * + * @return server-generated, recorded timestamp in seconds + */ + public int getNowInSeconds() + { + if (nowInSeconds == Integer.MIN_VALUE) + nowInSeconds = FBUtilities.nowInSeconds(); + return nowInSeconds; + } + + /** * @return a QueryState object for internal C* calls (not limited by any kind of auth). */ public static QueryState forInternalCalls() @@ -53,18 +91,4 @@ public class QueryState { return clientState.getClientAddress(); } - - public long getTimestamp() - { - if (timestamp == Long.MIN_VALUE) - timestamp = clientState.getTimestamp(); - return timestamp; - } - - public int getNowInSeconds() - { - if (nowInSeconds == Integer.MIN_VALUE) - nowInSeconds = FBUtilities.nowInSeconds(); - return nowInSeconds; - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/test/unit/org/apache/cassandra/cql3/ListsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/ListsTest.java b/test/unit/org/apache/cassandra/cql3/ListsTest.java index a377b96..1155619 100644 --- a/test/unit/org/apache/cassandra/cql3/ListsTest.java +++ b/test/unit/org/apache/cassandra/cql3/ListsTest.java @@ -37,6 +37,7 @@ import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; public class ListsTest extends CQLTester @@ -140,7 +141,8 @@ public class ListsTest extends CQLTester ByteBuffer keyBuf = ByteBufferUtil.bytes("key"); DecoratedKey key = Murmur3Partitioner.instance.decorateKey(keyBuf); - UpdateParameters parameters = new UpdateParameters(metaData, null, QueryOptions.DEFAULT, System.currentTimeMillis(), 1000, Collections.emptyMap()); + UpdateParameters parameters = + new UpdateParameters(metaData, null, QueryOptions.DEFAULT, System.currentTimeMillis(), FBUtilities.nowInSeconds(), 1000, Collections.emptyMap()); Clustering clustering = Clustering.make(ByteBufferUtil.bytes(1)); parameters.newRow(clustering); prepender.execute(key, parameters); http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/test/unit/org/apache/cassandra/transport/SerDeserTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/SerDeserTest.java b/test/unit/org/apache/cassandra/transport/SerDeserTest.java index 9d07321..42ffa26 100644 --- a/test/unit/org/apache/cassandra/transport/SerDeserTest.java +++ b/test/unit/org/apache/cassandra/transport/SerDeserTest.java @@ -32,6 +32,8 @@ import org.apache.cassandra.cql3.*; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.serializers.CollectionSerializer; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.QueryState; import org.apache.cassandra.transport.Event.TopologyChange; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.StatusChange; @@ -365,6 +367,8 @@ public class SerDeserTest QueryOptions.codec.encode(options, buf, version); QueryOptions decodedOptions = QueryOptions.codec.decode(buf, version); + QueryState state = new QueryState(ClientState.forInternalCalls()); + assertNotNull(decodedOptions); assertEquals(options.getConsistency(), decodedOptions.getConsistency()); assertEquals(options.getSerialConsistency(), decodedOptions.getSerialConsistency()); @@ -374,6 +378,7 @@ public class SerDeserTest assertEquals(options.getPagingState(), decodedOptions.getPagingState()); assertEquals(options.skipMetadata(), decodedOptions.skipMetadata()); assertEquals(options.getKeyspace(), decodedOptions.getKeyspace()); - assertEquals(options.getNowInSeconds(), decodedOptions.getNowInSeconds()); + assertEquals(options.getTimestamp(state), decodedOptions.getTimestamp(state)); + assertEquals(options.getNowInSeconds(state), decodedOptions.getNowInSeconds(state)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java index 7dabe84..dea4eeb 100644 --- a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java +++ b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.lang3.ArrayUtils; @@ -45,7 +46,6 @@ import org.apache.cassandra.cql3.functions.UDHelper; import org.apache.cassandra.cql3.statements.UpdateStatement; import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.UserType; -import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.exceptions.InvalidRequestException; @@ -246,15 +246,16 @@ public class StressCQLSSTableWriter implements Closeable List<ByteBuffer> keys = insert.buildPartitionKeyNames(options); SortedSet<Clustering> clusterings = insert.createClustering(options); - long now = System.currentTimeMillis() * 1000; + long now = System.currentTimeMillis(); // Note that we asks indexes to not validate values (the last 'false' arg below) because that triggers a 'Keyspace.open' // and that forces a lot of initialization that we don't want. UpdateParameters params = new UpdateParameters(insert.metadata(), insert.updatedColumns(), options, - insert.getTimestamp(now, options), + insert.getTimestamp(TimeUnit.MILLISECONDS.toMicros(now), options), + (int) TimeUnit.MILLISECONDS.toSeconds(now), insert.getTimeToLive(options), - Collections.<DecoratedKey, Partition>emptyMap()); + Collections.emptyMap()); try { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org