Repository: cassandra
Updated Branches:
  refs/heads/trunk f31d1a05a -> aed682513


Use consistent nowInSeconds and timestamps values within a request

patch by Aleksey Yeschenko; reviewed by Chris Lohfink for
CASSANDRA-14671


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aed68251
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aed68251
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aed68251

Branch: refs/heads/trunk
Commit: aed682513cc381b80705d1f971fddc394e8a62a5
Parents: f31d1a05
Author: Aleksey Yeshchenko <alek...@apple.com>
Authored: Fri Aug 31 11:13:03 2018 +0100
Committer: Aleksey Yeshchenko <alek...@apple.com>
Committed: Fri Aug 31 18:29:33 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/cql3/BatchQueryOptions.java       |   4 +-
 .../org/apache/cassandra/cql3/QueryOptions.java |   4 +-
 .../apache/cassandra/cql3/UpdateParameters.java |   4 +-
 .../cql3/statements/BatchStatement.java         |  64 +++++++-----
 .../cql3/statements/CQL3CasRequest.java         |  43 +++++---
 .../cql3/statements/ModificationStatement.java  | 101 +++++++++++++------
 .../cql3/statements/SelectStatement.java        |   7 +-
 .../cassandra/io/sstable/CQLSSTableWriter.java  |  10 +-
 .../apache/cassandra/service/QueryState.java    |  54 +++++++---
 .../org/apache/cassandra/cql3/ListsTest.java    |   4 +-
 .../cassandra/transport/SerDeserTest.java       |   7 +-
 .../io/sstable/StressCQLSSTableWriter.java      |   9 +-
 13 files changed, 206 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 475cd48..e40cf27 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Use consistent nowInSeconds and timestamps values within a request 
(CASSANDRA-14671)
  * Add sampler for query time and expose with nodetool (CASSANDRA-14436)
  * Clean up Message.Request implementations (CASSANDRA-14677)
  * Disable old native protocol versions on demand (CASANDRA-14659)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java 
b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
index ac0d148..ac8f179 100644
--- a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
@@ -84,9 +84,9 @@ public abstract class BatchQueryOptions
         return wrapped.getTimestamp(state);
     }
 
-    public int getNowInSeconds()
+    public int getNowInSeconds(QueryState state)
     {
-        return wrapped.getNowInSeconds();
+        return wrapped.getNowInSeconds(state);
     }
 
     private static class WithoutPerStatementVariables extends BatchQueryOptions

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/src/java/org/apache/cassandra/cql3/QueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java 
b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index e546304..f76d6b2 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -200,10 +200,10 @@ public abstract class QueryOptions
         return tstamp != Long.MIN_VALUE ? tstamp : state.getTimestamp();
     }
 
-    public int getNowInSeconds()
+    public int getNowInSeconds(QueryState state)
     {
         int nowInSeconds = getSpecificOptions().nowInSeconds;
-        return Integer.MIN_VALUE == nowInSeconds ? FBUtilities.nowInSeconds() 
: nowInSeconds;
+        return nowInSeconds != Integer.MIN_VALUE ? nowInSeconds : 
state.getNowInSeconds();
     }
 
     /** The keyspace that this query is bound to, or null if not relevant. */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java 
b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 500862e..740cd91 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -28,7 +28,6 @@ import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * Groups the parameters of an update query, and make building updates easier.
@@ -58,6 +57,7 @@ public class UpdateParameters
                             RegularAndStaticColumns updatedColumns,
                             QueryOptions options,
                             long timestamp,
+                            int nowInSec,
                             int ttl,
                             Map<DecoratedKey, Partition> prefetchedRows)
     throws InvalidRequestException
@@ -66,7 +66,7 @@ public class UpdateParameters
         this.updatedColumns = updatedColumns;
         this.options = options;
 
-        this.nowInSec = options.getNowInSeconds();
+        this.nowInSec = nowInSec;
         this.timestamp = timestamp;
         this.ttl = ttl;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 089c532..e925735 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -261,8 +261,11 @@ public class BatchStatement implements CQLStatement
         return statements;
     }
 
-    private Collection<? extends IMutation> getMutations(BatchQueryOptions 
options, boolean local, long now, long queryStartNanoTime)
-    throws RequestExecutionException, RequestValidationException
+    private Collection<? extends IMutation> getMutations(BatchQueryOptions 
options,
+                                                         boolean local,
+                                                         long batchTimestamp,
+                                                         int nowInSeconds,
+                                                         long 
queryStartNanoTime)
     {
         Set<String> tablesWithZeroGcGs = null;
         BatchUpdatesCollector collector = new 
BatchUpdatesCollector(updatedColumns, updatedRows());
@@ -276,8 +279,8 @@ public class BatchStatement implements CQLStatement
                 tablesWithZeroGcGs.add(statement.metadata.toString());
             }
             QueryOptions statementOptions = options.forStatement(i);
-            long timestamp = attrs.getTimestamp(now, statementOptions);
-            statement.addUpdates(collector, statementOptions, local, 
timestamp, queryStartNanoTime);
+            long timestamp = attrs.getTimestamp(batchTimestamp, 
statementOptions);
+            statement.addUpdates(collector, statementOptions, local, 
timestamp, nowInSeconds, queryStartNanoTime);
         }
 
         if (tablesWithZeroGcGs != null)
@@ -372,19 +375,16 @@ public class BatchStatement implements CQLStatement
     }
 
 
-    public ResultMessage execute(QueryState queryState, QueryOptions options, 
long queryStartNanoTime) throws RequestExecutionException, 
RequestValidationException
+    public ResultMessage execute(QueryState queryState, QueryOptions options, 
long queryStartNanoTime)
     {
         return execute(queryState, 
BatchQueryOptions.withoutPerStatementVariables(options), queryStartNanoTime);
     }
 
-    public ResultMessage execute(QueryState queryState, BatchQueryOptions 
options, long queryStartNanoTime) throws RequestExecutionException, 
RequestValidationException
+    public ResultMessage execute(QueryState queryState, BatchQueryOptions 
options, long queryStartNanoTime)
     {
-        return execute(queryState, options, false, 
options.getTimestamp(queryState), queryStartNanoTime);
-    }
+        long timestamp = options.getTimestamp(queryState);
+        int nowInSeconds = options.getNowInSeconds(queryState);
 
-    private ResultMessage execute(QueryState queryState, BatchQueryOptions 
options, boolean local, long now, long queryStartNanoTime)
-    throws RequestExecutionException, RequestValidationException
-    {
         if (options.getConsistency() == null)
             throw new InvalidRequestException("Invalid empty consistency 
level");
         if (options.getSerialConsistency() == null)
@@ -396,7 +396,7 @@ public class BatchStatement implements CQLStatement
         if (updatesVirtualTables)
             executeInternalWithoutCondition(queryState, options, 
queryStartNanoTime);
         else    
-            executeWithoutConditions(getMutations(options, local, now, 
queryStartNanoTime), options.getConsistency(), queryStartNanoTime);
+            executeWithoutConditions(getMutations(options, false, timestamp, 
nowInSeconds, queryStartNanoTime), options.getConsistency(), 
queryStartNanoTime);
 
         return new ResultMessage.Void();
     }
@@ -427,7 +427,6 @@ public class BatchStatement implements CQLStatement
     }
 
     private ResultMessage executeWithConditions(BatchQueryOptions options, 
QueryState state, long queryStartNanoTime)
-    throws RequestExecutionException, RequestValidationException
     {
         Pair<CQL3CasRequest, Set<ColumnMetadata>> p = makeCasRequest(options, 
state);
         CQL3CasRequest casRequest = p.left;
@@ -443,22 +442,23 @@ public class BatchStatement implements CQLStatement
                                                    
options.getSerialConsistency(),
                                                    options.getConsistency(),
                                                    state.getClientState(),
-                                                   options.getNowInSeconds(),
+                                                   
options.getNowInSeconds(state),
                                                    queryStartNanoTime))
         {
-
             return new 
ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName,
                                                                                
   tableName,
                                                                                
   result,
                                                                                
   columnsWithConditions,
                                                                                
   true,
+                                                                               
   state,
                                                                                
   options.forStatement(0)));
         }
     }
 
     private Pair<CQL3CasRequest,Set<ColumnMetadata>> 
makeCasRequest(BatchQueryOptions options, QueryState state)
     {
-        long now = state.getTimestamp();
+        long batchTimestamp = options.getTimestamp(state);
+        int nowInSeconds = options.getNowInSeconds(state);
         DecoratedKey key = null;
         CQL3CasRequest casRequest = null;
         Set<ColumnMetadata> columnsWithConditions = new LinkedHashSet<>();
@@ -467,14 +467,14 @@ public class BatchStatement implements CQLStatement
         {
             ModificationStatement statement = statements.get(i);
             QueryOptions statementOptions = options.forStatement(i);
-            long timestamp = attrs.getTimestamp(now, statementOptions);
+            long timestamp = attrs.getTimestamp(batchTimestamp, 
statementOptions);
             List<ByteBuffer> pks = 
statement.buildPartitionKeyNames(statementOptions);
             if (statement.getRestrictions().keyIsInRelation())
                 throw new IllegalArgumentException("Batch with conditions 
cannot span multiple partitions (you cannot use IN on the partition key)");
             if (key == null)
             {
                 key = statement.metadata().partitioner.decorateKey(pks.get(0));
-                casRequest = new CQL3CasRequest(statement.metadata(), key, 
true, conditionColumns, updatesRegularRows, updatesStaticRow);
+                casRequest = new CQL3CasRequest(statement.metadata(), key, 
conditionColumns, updatesRegularRows, updatesStaticRow);
             }
             else if (!key.getKey().equals(pks.get(0)))
             {
@@ -497,7 +497,7 @@ public class BatchStatement implements CQLStatement
 
                 for (Slice slice : slices)
                 {
-                    casRequest.addRangeDeletion(slice, statement, 
statementOptions, timestamp);
+                    casRequest.addRangeDeletion(slice, statement, 
statementOptions, timestamp, nowInSeconds);
                 }
 
             }
@@ -513,7 +513,7 @@ public class BatchStatement implements CQLStatement
                     else if (columnsWithConditions != null)
                         Iterables.addAll(columnsWithConditions, 
statement.getColumnsWithConditions());
                 }
-                casRequest.addRowUpdate(clustering, statement, 
statementOptions, timestamp);
+                casRequest.addRowUpdate(clustering, statement, 
statementOptions, timestamp, nowInSeconds);
             }
         }
 
@@ -536,14 +536,17 @@ public class BatchStatement implements CQLStatement
         return new ResultMessage.Void();
     }
 
-    private ResultMessage executeInternalWithoutCondition(QueryState 
queryState, BatchQueryOptions batchOptions, long queryStartNanoTime) throws 
RequestValidationException, RequestExecutionException
+    private ResultMessage executeInternalWithoutCondition(QueryState 
queryState, BatchQueryOptions batchOptions, long queryStartNanoTime)
     {
-        for (IMutation mutation : getMutations(batchOptions, true, 
queryState.getTimestamp(), queryStartNanoTime))
+        long timestamp = batchOptions.getTimestamp(queryState);
+        int nowInSeconds = batchOptions.getNowInSeconds(queryState);
+
+        for (IMutation mutation : getMutations(batchOptions, true, timestamp, 
nowInSeconds, queryStartNanoTime))
             mutation.apply();
         return null;
     }
 
-    private ResultMessage executeInternalWithConditions(BatchQueryOptions 
options, QueryState state) throws RequestExecutionException, 
RequestValidationException
+    private ResultMessage executeInternalWithConditions(BatchQueryOptions 
options, QueryState state)
     {
         Pair<CQL3CasRequest, Set<ColumnMetadata>> p = makeCasRequest(options, 
state);
         CQL3CasRequest request = p.left;
@@ -552,9 +555,20 @@ public class BatchStatement implements CQLStatement
         String ksName = request.metadata.keyspace;
         String tableName = request.metadata.name;
 
-        try (RowIterator result = ModificationStatement.casInternal(request, 
state, options.getNowInSeconds()))
+        long timestamp = options.getTimestamp(state);
+        int nowInSeconds = options.getNowInSeconds(state);
+
+        try (RowIterator result = ModificationStatement.casInternal(request, 
timestamp, nowInSeconds))
         {
-            return new 
ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, tableName, 
result, columnsWithConditions, true, options.forStatement(0)));
+            ResultSet resultSet =
+                ModificationStatement.buildCasResultSet(ksName,
+                                                        tableName,
+                                                        result,
+                                                        columnsWithConditions,
+                                                        true,
+                                                        state,
+                                                        
options.forStatement(0));
+            return new ResultMessage.Rows(resultSet);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java 
b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 7953c8b..ed985db 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -45,7 +45,6 @@ public class CQL3CasRequest implements CASRequest
 {
     public final TableMetadata metadata;
     public final DecoratedKey key;
-    public final boolean isBatch;
     private final RegularAndStaticColumns conditionColumns;
     private final boolean updatesRegularRows;
     private final boolean updatesStaticRow;
@@ -64,7 +63,6 @@ public class CQL3CasRequest implements CASRequest
 
     public CQL3CasRequest(TableMetadata metadata,
                           DecoratedKey key,
-                          boolean isBatch,
                           RegularAndStaticColumns conditionColumns,
                           boolean updatesRegularRows,
                           boolean updatesStaticRow)
@@ -72,20 +70,19 @@ public class CQL3CasRequest implements CASRequest
         this.metadata = metadata;
         this.key = key;
         this.conditions = new TreeMap<>(metadata.comparator);
-        this.isBatch = isBatch;
         this.conditionColumns = conditionColumns;
         this.updatesRegularRows = updatesRegularRows;
         this.updatesStaticRow = updatesStaticRow;
     }
 
-    public void addRowUpdate(Clustering clustering, ModificationStatement 
stmt, QueryOptions options, long timestamp)
+    void addRowUpdate(Clustering clustering, ModificationStatement stmt, 
QueryOptions options, long timestamp, int nowInSeconds)
     {
-        updates.add(new RowUpdate(clustering, stmt, options, timestamp));
+        updates.add(new RowUpdate(clustering, stmt, options, timestamp, 
nowInSeconds));
     }
 
-    public void addRangeDeletion(Slice slice, ModificationStatement stmt, 
QueryOptions options, long timestamp)
+    void addRangeDeletion(Slice slice, ModificationStatement stmt, 
QueryOptions options, long timestamp, int nowInSeconds)
     {
-        rangeDeletions.add(new RangeDeletion(slice, stmt, options, timestamp));
+        rangeDeletions.add(new RangeDeletion(slice, stmt, options, timestamp, 
nowInSeconds));
     }
 
     public void addNotExist(Clustering clustering) throws 
InvalidRequestException
@@ -262,19 +259,28 @@ public class CQL3CasRequest implements CASRequest
         private final ModificationStatement stmt;
         private final QueryOptions options;
         private final long timestamp;
+        private final int nowInSeconds;
 
-        private RowUpdate(Clustering clustering, ModificationStatement stmt, 
QueryOptions options, long timestamp)
+        private RowUpdate(Clustering clustering, ModificationStatement stmt, 
QueryOptions options, long timestamp, int nowInSeconds)
         {
             this.clustering = clustering;
             this.stmt = stmt;
             this.options = options;
             this.timestamp = timestamp;
+            this.nowInSeconds = nowInSeconds;
         }
 
-        public void applyUpdates(FilteredPartition current, 
PartitionUpdate.Builder updateBuilder) throws InvalidRequestException
+        void applyUpdates(FilteredPartition current, PartitionUpdate.Builder 
updateBuilder)
         {
             Map<DecoratedKey, Partition> map = stmt.requiresRead() ? 
Collections.singletonMap(key, current) : null;
-            UpdateParameters params = new UpdateParameters(metadata, 
updateBuilder.columns(), options, timestamp, stmt.getTimeToLive(options), map);
+            UpdateParameters params =
+                new UpdateParameters(metadata,
+                                     updateBuilder.columns(),
+                                     options,
+                                     timestamp,
+                                     nowInSeconds,
+                                     stmt.getTimeToLive(options),
+                                     map);
             stmt.addUpdateForKey(updateBuilder, clustering, params);
         }
     }
@@ -285,20 +291,29 @@ public class CQL3CasRequest implements CASRequest
         private final ModificationStatement stmt;
         private final QueryOptions options;
         private final long timestamp;
+        private final int nowInSeconds;
 
-        private RangeDeletion(Slice slice, ModificationStatement stmt, 
QueryOptions options, long timestamp)
+        private RangeDeletion(Slice slice, ModificationStatement stmt, 
QueryOptions options, long timestamp, int nowInSeconds)
         {
             this.slice = slice;
             this.stmt = stmt;
             this.options = options;
             this.timestamp = timestamp;
+            this.nowInSeconds = nowInSeconds;
         }
 
-        public void applyUpdates(FilteredPartition current, 
PartitionUpdate.Builder updateBuilder) throws InvalidRequestException
+        void applyUpdates(FilteredPartition current, PartitionUpdate.Builder 
updateBuilder)
         {
             // No slice statements currently require a read, but this 
maintains consistency with RowUpdate, and future proofs us
-            Map<DecoratedKey, Partition> map = stmt.requiresRead() ? 
Collections.<DecoratedKey, Partition>singletonMap(key, current) : null;
-            UpdateParameters params = new UpdateParameters(metadata, 
updateBuilder.columns(), options, timestamp, stmt.getTimeToLive(options), map);
+            Map<DecoratedKey, Partition> map = stmt.requiresRead() ? 
Collections.singletonMap(key, current) : null;
+            UpdateParameters params =
+                new UpdateParameters(metadata,
+                                     updateBuilder.columns(),
+                                     options,
+                                     timestamp,
+                                     nowInSeconds,
+                                     stmt.getTimeToLive(options),
+                                     map);
             stmt.addUpdateForKey(updateBuilder, slice, params);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 5f3d07f..13fc659 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -465,15 +465,19 @@ public abstract class ModificationStatement implements 
CQLStatement
         else
             cl.validateForWrite(metadata.keyspace);
 
-        Collection<? extends IMutation> mutations = getMutations(options, 
false, options.getTimestamp(queryState), queryStartNanoTime);
+        Collection<? extends IMutation> mutations =
+            getMutations(options,
+                         false,
+                         options.getTimestamp(queryState),
+                         options.getNowInSeconds(queryState),
+                         queryStartNanoTime);
         if (!mutations.isEmpty())
             StorageProxy.mutateWithTriggers(mutations, cl, false, 
queryStartNanoTime);
 
         return null;
     }
 
-    public ResultMessage executeWithCondition(QueryState queryState, 
QueryOptions options, long queryStartNanoTime)
-    throws RequestExecutionException, RequestValidationException
+    private ResultMessage executeWithCondition(QueryState queryState, 
QueryOptions options, long queryStartNanoTime)
     {
         CQL3CasRequest request = makeCasRequest(queryState, options);
 
@@ -484,10 +488,10 @@ public abstract class ModificationStatement implements 
CQLStatement
                                                    
options.getSerialConsistency(),
                                                    options.getConsistency(),
                                                    queryState.getClientState(),
-                                                   options.getNowInSeconds(),
+                                                   
options.getNowInSeconds(queryState),
                                                    queryStartNanoTime))
         {
-            return new ResultMessage.Rows(buildCasResultSet(result, options));
+            return new ResultMessage.Rows(buildCasResultSet(result, 
queryState, options));
         }
     }
 
@@ -500,17 +504,18 @@ public abstract class ModificationStatement implements 
CQLStatement
                    type.isUpdate()? "updates" : "deletions");
 
         DecoratedKey key = metadata().partitioner.decorateKey(keys.get(0));
-        long now = options.getTimestamp(queryState);
+        long timestamp = options.getTimestamp(queryState);
+        int nowInSeconds = options.getNowInSeconds(queryState);
 
         checkFalse(restrictions.clusteringKeyRestrictionsHasIN(),
                    "IN on the clustering key columns is not supported with 
conditional %s",
                     type.isUpdate()? "updates" : "deletions");
 
         Clustering clustering = 
Iterables.getOnlyElement(createClustering(options));
-        CQL3CasRequest request = new CQL3CasRequest(metadata(), key, false, 
conditionColumns(), updatesRegularRows(), updatesStaticRow());
+        CQL3CasRequest request = new CQL3CasRequest(metadata(), key, 
conditionColumns(), updatesRegularRows(), updatesStaticRow());
 
         addConditions(clustering, request, options);
-        request.addRowUpdate(clustering, this, options, now);
+        request.addRowUpdate(clustering, this, options, timestamp, 
nowInSeconds);
 
         return request;
     }
@@ -533,13 +538,18 @@ public abstract class ModificationStatement implements 
CQLStatement
         return new ColumnSpecification(ksName, cfName, CAS_RESULT_COLUMN, 
BooleanType.instance);
     }
 
-    private ResultSet buildCasResultSet(RowIterator partition, QueryOptions 
options) throws InvalidRequestException
+    private ResultSet buildCasResultSet(RowIterator partition, QueryState 
state, QueryOptions options)
     {
-        return buildCasResultSet(keyspace(), columnFamily(), partition, 
getColumnsWithConditions(), false, options);
+        return buildCasResultSet(keyspace(), columnFamily(), partition, 
getColumnsWithConditions(), false, state, options);
     }
 
-    public static ResultSet buildCasResultSet(String ksName, String tableName, 
RowIterator partition, Iterable<ColumnMetadata> columnsWithConditions, boolean 
isBatch, QueryOptions options)
-    throws InvalidRequestException
+    static ResultSet buildCasResultSet(String ksName,
+                                       String tableName,
+                                       RowIterator partition,
+                                       Iterable<ColumnMetadata> 
columnsWithConditions,
+                                       boolean isBatch,
+                                       QueryState state,
+                                       QueryOptions options)
     {
         boolean success = partition == null;
 
@@ -547,7 +557,7 @@ public abstract class ModificationStatement implements 
CQLStatement
         List<List<ByteBuffer>> rows = 
Collections.singletonList(Collections.singletonList(BooleanType.instance.decompose(success)));
 
         ResultSet rs = new ResultSet(metadata, rows);
-        return success ? rs : merge(rs, buildCasFailureResultSet(partition, 
columnsWithConditions, isBatch, options));
+        return success ? rs : merge(rs, buildCasFailureResultSet(partition, 
columnsWithConditions, isBatch, options, options.getNowInSeconds(state)));
     }
 
     private static ResultSet merge(ResultSet left, ResultSet right)
@@ -573,8 +583,11 @@ public abstract class ModificationStatement implements 
CQLStatement
         return new ResultSet(new ResultSet.ResultMetadata(EMPTY_HASH, specs), 
rows);
     }
 
-    private static ResultSet buildCasFailureResultSet(RowIterator partition, 
Iterable<ColumnMetadata> columnsWithConditions, boolean isBatch, QueryOptions 
options)
-    throws InvalidRequestException
+    private static ResultSet buildCasFailureResultSet(RowIterator partition,
+                                                      Iterable<ColumnMetadata> 
columnsWithConditions,
+                                                      boolean isBatch,
+                                                      QueryOptions options,
+                                                      int nowInSeconds)
     {
         TableMetadata metadata = partition.metadata();
         Selection selection;
@@ -598,10 +611,8 @@ public abstract class ModificationStatement implements 
CQLStatement
 
         Selectors selectors = selection.newSelectors(options);
         ResultSetBuilder builder = new 
ResultSetBuilder(selection.getResultMetadata(), selectors);
-        SelectStatement.forSelection(metadata, 
selection).processPartition(partition,
-                                                                      options,
-                                                                      builder,
-                                                                      
options.getNowInSeconds());
+        SelectStatement.forSelection(metadata, selection)
+                       .processPartition(partition, options, builder, 
nowInSeconds);
 
         return builder.build();
     }
@@ -613,25 +624,29 @@ public abstract class ModificationStatement implements 
CQLStatement
                : executeInternalWithoutCondition(queryState, options, 
System.nanoTime());
     }
 
-    public ResultMessage executeInternalWithoutCondition(QueryState 
queryState, QueryOptions options, long queryStartNanoTime) throws 
RequestValidationException, RequestExecutionException
+    public ResultMessage executeInternalWithoutCondition(QueryState 
queryState, QueryOptions options, long queryStartNanoTime)
+    throws RequestValidationException, RequestExecutionException
     {
-        for (IMutation mutation : getMutations(options, true, 
queryState.getTimestamp(), queryStartNanoTime))
+        long timestamp = options.getTimestamp(queryState);
+        int nowInSeconds = options.getNowInSeconds(queryState);
+        for (IMutation mutation : getMutations(options, true, timestamp, 
nowInSeconds, queryStartNanoTime))
             mutation.apply();
         return null;
     }
 
-    public ResultMessage executeInternalWithCondition(QueryState state, 
QueryOptions options) throws RequestValidationException, 
RequestExecutionException
+    public ResultMessage executeInternalWithCondition(QueryState state, 
QueryOptions options)
     {
         CQL3CasRequest request = makeCasRequest(state, options);
-        try (RowIterator result = casInternal(request, state, 
options.getNowInSeconds()))
+
+        try (RowIterator result = casInternal(request, 
options.getTimestamp(state), options.getNowInSeconds(state)))
         {
-            return new ResultMessage.Rows(buildCasResultSet(result, options));
+            return new ResultMessage.Rows(buildCasResultSet(result, state, 
options));
         }
     }
 
-    static RowIterator casInternal(CQL3CasRequest request, QueryState state, 
int nowInSeconds)
+    static RowIterator casInternal(CQL3CasRequest request, long timestamp, int 
nowInSeconds)
     {
-        UUID ballot = UUIDGen.getTimeUUIDFromMicros(state.getTimestamp());
+        UUID ballot = UUIDGen.getTimeUUIDFromMicros(timestamp);
 
         SinglePartitionReadQuery readCommand = 
request.readCommand(nowInSeconds);
         FilteredPartition current;
@@ -661,10 +676,14 @@ public abstract class ModificationStatement implements 
CQLStatement
      *
      * @return list of the mutations
      */
-    private Collection<? extends IMutation> getMutations(QueryOptions options, 
boolean local, long timestamp, long queryStartNanoTime)
+    private Collection<? extends IMutation> getMutations(QueryOptions options,
+                                                         boolean local,
+                                                         long timestamp,
+                                                         int nowInSeconds,
+                                                         long 
queryStartNanoTime)
     {
         UpdatesCollector collector = new SingleTableUpdatesCollector(metadata, 
updatedColumns, 1);
-        addUpdates(collector, options, local, timestamp, queryStartNanoTime);
+        addUpdates(collector, options, local, timestamp, nowInSeconds, 
queryStartNanoTime);
         return collector.toMutations();
     }
 
@@ -672,6 +691,7 @@ public abstract class ModificationStatement implements 
CQLStatement
                           QueryOptions options,
                           boolean local,
                           long timestamp,
+                          int nowInSeconds,
                           long queryStartNanoTime)
     {
         List<ByteBuffer> keys = buildPartitionKeyNames(options);
@@ -690,6 +710,7 @@ public abstract class ModificationStatement implements 
CQLStatement
                                                            DataLimits.NONE,
                                                            local,
                                                            timestamp,
+                                                           nowInSeconds,
                                                            queryStartNanoTime);
             for (ByteBuffer key : keys)
             {
@@ -710,7 +731,7 @@ public abstract class ModificationStatement implements 
CQLStatement
             if (restrictions.hasClusteringColumnsRestrictions() && 
clusterings.isEmpty())
                 return;
 
-            UpdateParameters params = makeUpdateParameters(keys, clusterings, 
options, local, timestamp, queryStartNanoTime);
+            UpdateParameters params = makeUpdateParameters(keys, clusterings, 
options, local, timestamp, nowInSeconds, queryStartNanoTime);
 
             for (ByteBuffer key : keys)
             {
@@ -754,6 +775,7 @@ public abstract class ModificationStatement implements 
CQLStatement
                                                   QueryOptions options,
                                                   boolean local,
                                                   long timestamp,
+                                                  int nowInSeconds,
                                                   long queryStartNanoTime)
     {
         if (clusterings.contains(Clustering.STATIC_CLUSTERING))
@@ -763,6 +785,7 @@ public abstract class ModificationStatement implements 
CQLStatement
                                         DataLimits.cqlLimits(1),
                                         local,
                                         timestamp,
+                                        nowInSeconds,
                                         queryStartNanoTime);
 
         return makeUpdateParameters(keys,
@@ -771,6 +794,7 @@ public abstract class ModificationStatement implements 
CQLStatement
                                     DataLimits.NONE,
                                     local,
                                     timestamp,
+                                    nowInSeconds,
                                     queryStartNanoTime);
     }
 
@@ -780,11 +804,26 @@ public abstract class ModificationStatement implements 
CQLStatement
                                                   DataLimits limits,
                                                   boolean local,
                                                   long timestamp,
+                                                  int nowInSeconds,
                                                   long queryStartNanoTime)
     {
         // Some lists operation requires reading
-        Map<DecoratedKey, Partition> lists = readRequiredLists(keys, filter, 
limits, local, options.getConsistency(), options.getNowInSeconds(), 
queryStartNanoTime);
-        return new UpdateParameters(metadata(), updatedColumns(), options, 
getTimestamp(timestamp, options), getTimeToLive(options), lists);
+        Map<DecoratedKey, Partition> lists =
+            readRequiredLists(keys,
+                              filter,
+                              limits,
+                              local,
+                              options.getConsistency(),
+                              nowInSeconds,
+                              queryStartNanoTime);
+
+        return new UpdateParameters(metadata(),
+                                    updatedColumns(),
+                                    options,
+                                    getTimestamp(timestamp, options),
+                                    nowInSeconds,
+                                    getTimeToLive(options),
+                                    lists);
     }
 
     private Slices toSlices(SortedSet<ClusteringBound> startBounds, 
SortedSet<ClusteringBound> endBounds)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 61715b9..f847a6e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -66,7 +66,6 @@ import org.apache.cassandra.service.pager.QueryPager;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 
@@ -227,14 +226,14 @@ public class SelectStatement implements CQLStatement
         // Nothing to do, all validation has been done by 
RawStatement.prepare()
     }
 
-    public ResultMessage.Rows execute(QueryState state, QueryOptions options, 
long queryStartNanoTime) throws RequestExecutionException, 
RequestValidationException
+    public ResultMessage.Rows execute(QueryState state, QueryOptions options, 
long queryStartNanoTime)
     {
         ConsistencyLevel cl = options.getConsistency();
         checkNotNull(cl, "Invalid empty consistency level");
 
         cl.validateForRead(keyspace());
 
-        int nowInSec = options.getNowInSeconds();
+        int nowInSec = options.getNowInSeconds(state);
         int userLimit = getLimit(options);
         int userPerPartitionLimit = getPerPartitionLimit(options);
         int pageSize = options.getPageSize();
@@ -428,7 +427,7 @@ public class SelectStatement implements CQLStatement
 
     public ResultMessage.Rows executeLocally(QueryState state, QueryOptions 
options) throws RequestExecutionException, RequestValidationException
     {
-        return executeInternal(state, options, options.getNowInSeconds(), 
System.nanoTime());
+        return executeInternal(state, options, options.getNowInSeconds(state), 
System.nanoTime());
     }
 
     public ResultMessage.Rows executeInternal(QueryState state, QueryOptions 
options, int nowInSec, long queryStartNanoTime) throws 
RequestExecutionException, RequestValidationException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 0c2cf28..b24f595 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import com.datastax.driver.core.ProtocolVersion;
@@ -43,10 +44,8 @@ import org.apache.cassandra.cql3.functions.UDHelper;
 import org.apache.cassandra.cql3.statements.ModificationStatement;
 import org.apache.cassandra.cql3.statements.UpdateStatement;
 import org.apache.cassandra.db.Clustering;
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.marshal.UserType;
-import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -250,15 +249,16 @@ public class CQLSSTableWriter implements Closeable
         List<ByteBuffer> keys = insert.buildPartitionKeyNames(options);
         SortedSet<Clustering> clusterings = insert.createClustering(options);
 
-        long now = System.currentTimeMillis() * 1000;
+        long now = System.currentTimeMillis();
         // Note that we asks indexes to not validate values (the last 'false' 
arg below) because that triggers a 'Keyspace.open'
         // and that forces a lot of initialization that we don't want.
         UpdateParameters params = new UpdateParameters(insert.metadata,
                                                        insert.updatedColumns(),
                                                        options,
-                                                       
insert.getTimestamp(now, options),
+                                                       
insert.getTimestamp(TimeUnit.MILLISECONDS.toMicros(now), options),
+                                                       (int) 
TimeUnit.MILLISECONDS.toSeconds(now),
                                                        
insert.getTimeToLive(options),
-                                                       
Collections.<DecoratedKey, Partition>emptyMap());
+                                                       Collections.emptyMap());
 
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/src/java/org/apache/cassandra/service/QueryState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/QueryState.java 
b/src/java/org/apache/cassandra/service/QueryState.java
index b266fb8..d1b03d4 100644
--- a/src/java/org/apache/cassandra/service/QueryState.java
+++ b/src/java/org/apache/cassandra/service/QueryState.java
@@ -22,7 +22,10 @@ import java.net.InetAddress;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
- * Represents the state related to a given query.
+ * Primarily used as a recorder for server-generated timestamps (timestamp, in 
microseconds, and nowInSeconds - in, well, seconds).
+ *
+ * The goal is to be able to use a single consistent server-generated value 
for both timestamps across the whole request,
+ * and later be able to inspect QueryState for the generated values - for 
logging or other purposes.
  */
 public class QueryState
 {
@@ -37,6 +40,41 @@ public class QueryState
     }
 
     /**
+     * Generate, cache, and record a timestamp value on the server-side.
+     *
+     * Used in reads for all live and expiring cells, and all kinds of 
deletion infos.
+     *
+     * Shouldn't be used directly. {@link 
org.apache.cassandra.cql3.QueryOptions#getTimestamp(QueryState)} should be used
+     * by all consumers.
+     *
+     * @return server-generated, recorded timestamp in seconds
+     */
+    public long getTimestamp()
+    {
+        if (timestamp == Long.MIN_VALUE)
+            timestamp = clientState.getTimestamp();
+        return timestamp;
+    }
+
+    /**
+     * Generate, cache, and record a nowInSeconds value on the server-side.
+     *
+     * In writes is used for calculating localDeletionTime for tombstones and 
expiring cells and other deletion infos.
+     * In reads used to determine liveness of expiring cells and rows.
+     *
+     * Shouldn't be used directly. {@link 
org.apache.cassandra.cql3.QueryOptions#getNowInSeconds(QueryState)} should be 
used
+     * by all consumers.
+     *
+     * @return server-generated, recorded timestamp in seconds
+     */
+    public int getNowInSeconds()
+    {
+        if (nowInSeconds == Integer.MIN_VALUE)
+            nowInSeconds = FBUtilities.nowInSeconds();
+        return nowInSeconds;
+    }
+
+    /**
      * @return a QueryState object for internal C* calls (not limited by any 
kind of auth).
      */
     public static QueryState forInternalCalls()
@@ -53,18 +91,4 @@ public class QueryState
     {
         return clientState.getClientAddress();
     }
-
-    public long getTimestamp()
-    {
-        if (timestamp == Long.MIN_VALUE)
-            timestamp = clientState.getTimestamp();
-        return timestamp;
-    }
-
-    public int getNowInSeconds()
-    {
-        if (nowInSeconds == Integer.MIN_VALUE)
-            nowInSeconds = FBUtilities.nowInSeconds();
-        return nowInSeconds;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/test/unit/org/apache/cassandra/cql3/ListsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ListsTest.java 
b/test/unit/org/apache/cassandra/cql3/ListsTest.java
index a377b96..1155619 100644
--- a/test/unit/org/apache/cassandra/cql3/ListsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ListsTest.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
 public class ListsTest extends CQLTester
@@ -140,7 +141,8 @@ public class ListsTest extends CQLTester
 
         ByteBuffer keyBuf = ByteBufferUtil.bytes("key");
         DecoratedKey key = Murmur3Partitioner.instance.decorateKey(keyBuf);
-        UpdateParameters parameters = new UpdateParameters(metaData, null, 
QueryOptions.DEFAULT, System.currentTimeMillis(), 1000, Collections.emptyMap());
+        UpdateParameters parameters =
+            new UpdateParameters(metaData, null, QueryOptions.DEFAULT, 
System.currentTimeMillis(), FBUtilities.nowInSeconds(), 1000, 
Collections.emptyMap());
         Clustering clustering = Clustering.make(ByteBufferUtil.bytes(1));
         parameters.newRow(clustering);
         prepender.execute(key, parameters);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/test/unit/org/apache/cassandra/transport/SerDeserTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/SerDeserTest.java 
b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
index 9d07321..42ffa26 100644
--- a/test/unit/org/apache/cassandra/transport/SerDeserTest.java
+++ b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
@@ -32,6 +32,8 @@ import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.serializers.CollectionSerializer;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.Event.TopologyChange;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.StatusChange;
@@ -365,6 +367,8 @@ public class SerDeserTest
         QueryOptions.codec.encode(options, buf, version);
         QueryOptions decodedOptions = QueryOptions.codec.decode(buf, version);
 
+        QueryState state = new QueryState(ClientState.forInternalCalls());
+
         assertNotNull(decodedOptions);
         assertEquals(options.getConsistency(), 
decodedOptions.getConsistency());
         assertEquals(options.getSerialConsistency(), 
decodedOptions.getSerialConsistency());
@@ -374,6 +378,7 @@ public class SerDeserTest
         assertEquals(options.getPagingState(), 
decodedOptions.getPagingState());
         assertEquals(options.skipMetadata(), decodedOptions.skipMetadata());
         assertEquals(options.getKeyspace(), decodedOptions.getKeyspace());
-        assertEquals(options.getNowInSeconds(), 
decodedOptions.getNowInSeconds());
+        assertEquals(options.getTimestamp(state), 
decodedOptions.getTimestamp(state));
+        assertEquals(options.getNowInSeconds(state), 
decodedOptions.getNowInSeconds(state));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aed68251/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java 
b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
index 7dabe84..dea4eeb 100644
--- 
a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
+++ 
b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.ArrayUtils;
@@ -45,7 +46,6 @@ import org.apache.cassandra.cql3.functions.UDHelper;
 import org.apache.cassandra.cql3.statements.UpdateStatement;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.UserType;
-import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -246,15 +246,16 @@ public class StressCQLSSTableWriter implements Closeable
         List<ByteBuffer> keys = insert.buildPartitionKeyNames(options);
         SortedSet<Clustering> clusterings = insert.createClustering(options);
 
-        long now = System.currentTimeMillis() * 1000;
+        long now = System.currentTimeMillis();
         // Note that we asks indexes to not validate values (the last 'false' 
arg below) because that triggers a 'Keyspace.open'
         // and that forces a lot of initialization that we don't want.
         UpdateParameters params = new UpdateParameters(insert.metadata(),
                                                        insert.updatedColumns(),
                                                        options,
-                                                       
insert.getTimestamp(now, options),
+                                                       
insert.getTimestamp(TimeUnit.MILLISECONDS.toMicros(now), options),
+                                                       (int) 
TimeUnit.MILLISECONDS.toSeconds(now),
                                                        
insert.getTimeToLive(options),
-                                                       
Collections.<DecoratedKey, Partition>emptyMap());
+                                                       Collections.emptyMap());
 
         try
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to