Repository: cassandra
Updated Branches:
  refs/heads/trunk aa6233aa1 -> d5b2fa206


Make paxos reuse the timestamp generation of normal operation

patch by slebresne; reviewed by iamaleksey for CASSANDRA-7801


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/85ea3735
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/85ea3735
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/85ea3735

Branch: refs/heads/trunk
Commit: 85ea37356e666c2780294bbd29daa89a32ebf333
Parents: cdf80d9
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Thu Nov 6 16:38:27 2014 +0100
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Thu Nov 6 16:38:27 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../cql3/statements/BatchStatement.java         |   7 +-
 .../cql3/statements/ModificationStatement.java  |   3 +-
 .../cql3/statements/SelectStatement.java        |   8 +-
 .../apache/cassandra/service/ClientState.java   |  36 +++++
 .../apache/cassandra/service/QueryState.java    |   5 +-
 .../apache/cassandra/service/StorageProxy.java  | 139 ++++++++++++++-----
 .../service/pager/MultiPartitionPager.java      |  13 +-
 .../service/pager/NamesQueryPager.java          |   7 +-
 .../cassandra/service/pager/QueryPagers.java    |  30 ++--
 .../service/pager/SliceQueryPager.java          |  11 +-
 .../cassandra/thrift/CassandraServer.java       |  35 +++--
 12 files changed, 206 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2e60f3a..5348f2f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,6 @@
+2.1.3
+ * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
+
 2.1.2
  * (cqlsh) parse_for_table_meta errors out on queries with undefined
    grammars (CASSANDRA-8262)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 17d1771..d54e4fd 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -275,7 +275,7 @@ public class BatchStatement implements CQLStatement, 
MeasurableForPreparedCache
             throw new InvalidRequestException("Invalid empty serial 
consistency level");
 
         if (hasConditions)
-            return executeWithConditions(options, now);
+            return executeWithConditions(options, queryState);
 
         executeWithoutConditions(getMutations(options, local, now), 
options.getConsistency());
         return new ResultMessage.Void();
@@ -297,9 +297,10 @@ public class BatchStatement implements CQLStatement, 
MeasurableForPreparedCache
         StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
     }
 
-    private ResultMessage executeWithConditions(BatchQueryOptions options, 
long now)
+    private ResultMessage executeWithConditions(BatchQueryOptions options, 
QueryState state)
     throws RequestExecutionException, RequestValidationException
     {
+        long now = state.getTimestamp();
         ByteBuffer key = null;
         String ksName = null;
         String cfName = null;
@@ -339,7 +340,7 @@ public class BatchStatement implements CQLStatement, 
MeasurableForPreparedCache
             casRequest.addRowUpdate(clusteringPrefix, statement, 
statementOptions, timestamp);
         }
 
-        ColumnFamily result = StorageProxy.cas(ksName, cfName, key, 
casRequest, options.getSerialConsistency(), options.getConsistency());
+        ColumnFamily result = StorageProxy.cas(ksName, cfName, key, 
casRequest, options.getSerialConsistency(), options.getConsistency(), 
state.getClientState());
 
         return new 
ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, 
result, columnsWithConditions, true, options.forStatement(0)));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 974ccc8..846ad3e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -524,7 +524,8 @@ public abstract class ModificationStatement implements 
CQLStatement, MeasurableF
                                                key,
                                                request,
                                                options.getSerialConsistency(),
-                                               options.getConsistency());
+                                               options.getConsistency(),
+                                               queryState.getClientState());
         return new ResultMessage.Rows(buildCasResultSet(key, result, options));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 2632ee2..84cbdc0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -212,11 +212,11 @@ public class SelectStatement implements CQLStatement, 
MeasurableForPreparedCache
 
         if (pageSize <= 0 || command == null || 
!QueryPagers.mayNeedPaging(command, pageSize))
         {
-            return execute(command, options, limit, now);
+            return execute(command, options, limit, now, state);
         }
         else
         {
-            QueryPager pager = QueryPagers.pager(command, cl, 
options.getPagingState());
+            QueryPager pager = QueryPagers.pager(command, cl, 
state.getClientState(), options.getPagingState());
             if (parameters.isCount)
                 return pageCountQuery(pager, options, pageSize, now, limit);
 
@@ -250,7 +250,7 @@ public class SelectStatement implements CQLStatement, 
MeasurableForPreparedCache
         return getPageableCommand(options, getLimit(options), 
System.currentTimeMillis());
     }
 
-    private ResultMessage.Rows execute(Pageable command, QueryOptions options, 
int limit, long now) throws RequestValidationException, 
RequestExecutionException
+    private ResultMessage.Rows execute(Pageable command, QueryOptions options, 
int limit, long now, QueryState state) throws RequestValidationException, 
RequestExecutionException
     {
         List<Row> rows;
         if (command == null)
@@ -260,7 +260,7 @@ public class SelectStatement implements CQLStatement, 
MeasurableForPreparedCache
         else
         {
             rows = command instanceof Pageable.ReadCommands
-                 ? 
StorageProxy.read(((Pageable.ReadCommands)command).commands, 
options.getConsistency())
+                 ? 
StorageProxy.read(((Pageable.ReadCommands)command).commands, 
options.getConsistency(), state.getClientState())
                  : StorageProxy.getRangeSlice((RangeSliceCommand)command, 
options.getConsistency());
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java 
b/src/java/org/apache/cassandra/service/ClientState.java
index b4b162c..e1df1bd 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.service;
 import java.net.SocketAddress;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -99,6 +100,9 @@ public class ClientState
     // The remote address of the client - null for internal clients.
     private final SocketAddress remoteAddress;
 
+    // The biggest timestamp that was returned by getTimestamp/assigned to a 
query
+    private final AtomicLong lastTimestampMicros = new AtomicLong(0);
+
     /**
      * Construct a new, empty ClientState for internal calls.
      */
@@ -132,6 +136,38 @@ public class ClientState
         return new ClientState(remoteAddress);
     }
 
+    /**
+     * This clock guarantees that updates for the same ClientState will be 
ordered
+     * in the sequence seen, even if multiple updates happen in the same 
millisecond.
+     */
+    public long getTimestamp()
+    {
+        while (true)
+        {
+            long current = System.currentTimeMillis() * 1000;
+            long last = lastTimestampMicros.get();
+            long tstamp = last >= current ? last + 1 : current;
+            if (lastTimestampMicros.compareAndSet(last, tstamp))
+                return tstamp;
+        }
+    }
+
+    /**
+     * Can be use when a timestamp has been assigned by a query, but that 
timestamp is
+     * not directly one returned by getTimestamp() (see 
SP.beginAndRepairPaxos()).
+     * This ensure following calls to getTimestamp() will return a timestamp 
strictly
+     * greated than the one provided to this method.
+     */
+    public void updateLastTimestamp(long tstampMicros)
+    {
+        while (true)
+        {
+            long last = lastTimestampMicros.get();
+            if (tstampMicros <= last || 
lastTimestampMicros.compareAndSet(last, tstampMicros))
+                return;
+        }
+    }
+
     public static QueryHandler getCQLQueryHandler()
     {
         return cqlQueryHandler;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/service/QueryState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/QueryState.java 
b/src/java/org/apache/cassandra/service/QueryState.java
index 0179a3e..9891ba0 100644
--- a/src/java/org/apache/cassandra/service/QueryState.java
+++ b/src/java/org/apache/cassandra/service/QueryState.java
@@ -29,7 +29,6 @@ import org.apache.cassandra.utils.FBUtilities;
 public class QueryState
 {
     private final ClientState clientState;
-    private volatile long clock;
     private volatile UUID preparedTracingSession;
 
     public QueryState(ClientState clientState)
@@ -56,9 +55,7 @@ public class QueryState
      */
     public long getTimestamp()
     {
-        long current = System.currentTimeMillis() * 1000;
-        clock = clock >= current ? clock + 1 : current;
-        return clock;
+        return clientState.getTimestamp();
     }
 
     public boolean traceNextQuery()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 72f9e15..d45e74b 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -201,7 +201,8 @@ public class StorageProxy implements StorageProxyMBean
                                    ByteBuffer key,
                                    CASRequest request,
                                    ConsistencyLevel consistencyForPaxos,
-                                   ConsistencyLevel consistencyForCommit)
+                                   ConsistencyLevel consistencyForCommit,
+                                   ClientState state)
     throws UnavailableException, IsBootstrappingException, 
ReadTimeoutException, WriteTimeoutException, InvalidRequestException
     {
         final long start = System.nanoTime();
@@ -221,7 +222,7 @@ public class StorageProxy implements StorageProxyMBean
                 List<InetAddress> liveEndpoints = p.left;
                 int requiredParticipants = p.right;
 
-                final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, 
key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, 
consistencyForCommit, true);
+                final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, 
key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, 
consistencyForCommit, true, state);
                 final UUID ballot = pair.left;
                 contentions += pair.right;
                 // read the current values and check they validate the 
conditions
@@ -324,7 +325,15 @@ public class StorageProxy implements StorageProxyMBean
      * @return the Paxos ballot promised by the replicas if no in-progress 
requests were seen and a quorum of
      * nodes have seen the mostRecentCommit.  Otherwise, return null.
      */
-    private static Pair<UUID, Integer> beginAndRepairPaxos(long start, 
ByteBuffer key, CFMetaData metadata, List<InetAddress> liveEndpoints, int 
requiredParticipants, ConsistencyLevel consistencyForPaxos, ConsistencyLevel 
consistencyForCommit, final boolean isWrite)
+    private static Pair<UUID, Integer> beginAndRepairPaxos(long start,
+                                                           ByteBuffer key,
+                                                           CFMetaData metadata,
+                                                           List<InetAddress> 
liveEndpoints,
+                                                           int 
requiredParticipants,
+                                                           ConsistencyLevel 
consistencyForPaxos,
+                                                           ConsistencyLevel 
consistencyForCommit,
+                                                           final boolean 
isWrite,
+                                                           ClientState state)
     throws WriteTimeoutException
     {
         long timeout = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
@@ -333,9 +342,13 @@ public class StorageProxy implements StorageProxyMBean
         int contentions = 0;
         while (System.nanoTime() - start < timeout)
         {
+            // We don't want to use a timestamp that is older than the last 
one assigned by the ClientState or operations
+            // may appear out-of-order (#7801). But note that 
state.getTimestamp() is in microseconds while the ballot
+            // timestamp is only in milliseconds
+            long currentTime = (state.getTimestamp() / 1000) + 1;
             long ballotMillis = summary == null
-                              ? System.currentTimeMillis()
-                              : Math.max(System.currentTimeMillis(), 1 + 
UUIDGen.unixTimestamp(summary.mostRecentInProgressCommit.ballot));
+                              ? currentTime
+                              : Math.max(currentTime, 1 + 
UUIDGen.unixTimestamp(summary.mostRecentInProgressCommit.ballot));
             UUID ballot = UUIDGen.getTimeUUID(ballotMillis);
 
             // prepare
@@ -394,6 +407,10 @@ public class StorageProxy implements StorageProxyMBean
                 continue;
             }
 
+            // We might commit this ballot and we want to ensure operations 
starting after this CAS succeed will be assigned
+            // a timestamp greater that the one of this ballot, so operation 
order is preserved (#7801)
+            state.updateLastTimestamp(ballotMillis * 1000);
+
             return Pair.create(ballot, contentions);
         }
 
@@ -1134,11 +1151,19 @@ public class StorageProxy implements StorageProxyMBean
         return true;
     }
 
+    public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel 
consistencyLevel)
+    throws UnavailableException, IsBootstrappingException, 
ReadTimeoutException, InvalidRequestException
+    {
+        // When using serial CL, the ClientState should be provided
+        assert !consistencyLevel.isSerialConsistency();
+        return read(commands, consistencyLevel, null);
+    }
+
     /**
      * Performs the actual reading of a row out of the StorageService, fetching
      * a specific set of column names from a given column family.
      */
-    public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel 
consistency_level)
+    public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel 
consistencyLevel, ClientState state)
     throws UnavailableException, IsBootstrappingException, 
ReadTimeoutException, InvalidRequestException
     {
         if (StorageService.instance.isBootstrapMode() && 
!systemKeyspaceQuery(commands))
@@ -1148,68 +1173,106 @@ public class StorageProxy implements StorageProxyMBean
             throw new IsBootstrappingException();
         }
 
+        return consistencyLevel.isSerialConsistency()
+             ? readWithPaxos(commands, consistencyLevel, state)
+             : readRegular(commands, consistencyLevel);
+    }
+
+    private static List<Row> readWithPaxos(List<ReadCommand> commands, 
ConsistencyLevel consistencyLevel, ClientState state)
+    throws InvalidRequestException, UnavailableException, ReadTimeoutException
+    {
+        assert state != null;
+
         long start = System.nanoTime();
         List<Row> rows = null;
+
         try
         {
-            if (consistency_level.isSerialConsistency())
-            {
-                // make sure any in-progress paxos writes are done (i.e., 
committed to a majority of replicas), before performing a quorum read
-                if (commands.size() > 1)
-                    throw new InvalidRequestException("SERIAL/LOCAL_SERIAL 
consistency may only be requested for one row at a time");
-                ReadCommand command = commands.get(0);
-
-                CFMetaData metadata = 
Schema.instance.getCFMetaData(command.ksName, command.cfName);
-                Pair<List<InetAddress>, Integer> p = 
getPaxosParticipants(command.ksName, command.key, consistency_level);
-                List<InetAddress> liveEndpoints = p.left;
-                int requiredParticipants = p.right;
+            // make sure any in-progress paxos writes are done (i.e., 
committed to a majority of replicas), before performing a quorum read
+            if (commands.size() > 1)
+                throw new InvalidRequestException("SERIAL/LOCAL_SERIAL 
consistency may only be requested for one row at a time");
+            ReadCommand command = commands.get(0);
 
-                // does the work of applying in-progress writes; throws UAE or 
timeout if it can't
-                final ConsistencyLevel consistencyForCommitOrFetch = 
consistency_level == ConsistencyLevel.LOCAL_SERIAL ? 
ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
-                try
-                {
-                    final Pair<UUID, Integer> pair = 
beginAndRepairPaxos(start, command.key, metadata, liveEndpoints, 
requiredParticipants, consistency_level, consistencyForCommitOrFetch, false);
-                    if(pair.right > 0)
-                        casReadMetrics.contention.update(pair.right);
-                }
-                catch (WriteTimeoutException e)
-                {
-                    throw new ReadTimeoutException(consistency_level, 0, 
consistency_level.blockFor(Keyspace.open(command.ksName)), false);
-                }
+            CFMetaData metadata = 
Schema.instance.getCFMetaData(command.ksName, command.cfName);
+            Pair<List<InetAddress>, Integer> p = 
getPaxosParticipants(command.ksName, command.key, consistencyLevel);
+            List<InetAddress> liveEndpoints = p.left;
+            int requiredParticipants = p.right;
 
-                rows = fetchRows(commands, consistencyForCommitOrFetch);
+            // does the work of applying in-progress writes; throws UAE or 
timeout if it can't
+            final ConsistencyLevel consistencyForCommitOrFetch = 
consistencyLevel == ConsistencyLevel.LOCAL_SERIAL
+                                                                               
    ? ConsistencyLevel.LOCAL_QUORUM
+                                                                               
    : ConsistencyLevel.QUORUM;
+            try
+            {
+                final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, 
command.key, metadata, liveEndpoints, requiredParticipants, consistencyLevel, 
consistencyForCommitOrFetch, false, state);
+                if (pair.right > 0)
+                    casReadMetrics.contention.update(pair.right);
             }
-            else
+            catch (WriteTimeoutException e)
             {
-                rows = fetchRows(commands, consistency_level);
+                throw new ReadTimeoutException(consistencyLevel, 0, 
consistencyLevel.blockFor(Keyspace.open(command.ksName)), false);
             }
+
+            rows = fetchRows(commands, consistencyForCommitOrFetch);
+        }
+        catch (UnavailableException e)
+        {
+            readMetrics.unavailables.mark();
+            ClientRequestMetrics.readUnavailables.inc();
+            casReadMetrics.unavailables.mark();
+            throw e;
+        }
+        catch (ReadTimeoutException e)
+        {
+            readMetrics.timeouts.mark();
+            ClientRequestMetrics.readTimeouts.inc();
+            casReadMetrics.timeouts.mark();
+            throw e;
+        }
+        finally
+        {
+            long latency = System.nanoTime() - start;
+            readMetrics.addNano(latency);
+            casReadMetrics.addNano(latency);
+            // TODO avoid giving every command the same latency number.  Can 
fix this in CASSADRA-5329
+            for (ReadCommand command : commands)
+                
Keyspace.open(command.ksName).getColumnFamilyStore(command.cfName).metric.coordinatorReadLatency.update(latency,
 TimeUnit.NANOSECONDS);
+        }
+
+        return rows;
+    }
+
+    private static List<Row> readRegular(List<ReadCommand> commands, 
ConsistencyLevel consistencyLevel)
+    throws UnavailableException, ReadTimeoutException
+    {
+        long start = System.nanoTime();
+        List<Row> rows = null;
+
+        try
+        {
+            rows = fetchRows(commands, consistencyLevel);
         }
         catch (UnavailableException e)
         {
             readMetrics.unavailables.mark();
             ClientRequestMetrics.readUnavailables.inc();
-            if(consistency_level.isSerialConsistency())
-                casReadMetrics.unavailables.mark();
             throw e;
         }
         catch (ReadTimeoutException e)
         {
             readMetrics.timeouts.mark();
             ClientRequestMetrics.readTimeouts.inc();
-            if(consistency_level.isSerialConsistency())
-                casReadMetrics.timeouts.mark();
             throw e;
         }
         finally
         {
             long latency = System.nanoTime() - start;
             readMetrics.addNano(latency);
-            if(consistency_level.isSerialConsistency())
-                casReadMetrics.addNano(latency);
             // TODO avoid giving every command the same latency number.  Can 
fix this in CASSADRA-5329
             for (ReadCommand command : commands)
                 
Keyspace.open(command.ksName).getColumnFamilyStore(command.cfName).metric.coordinatorReadLatency.update(latency,
 TimeUnit.NANOSECONDS);
         }
+
         return rows;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java 
b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index 35d6752..6ed635f 100644
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.ClientState;
 
 /**
  * Pager over a list of ReadCommand.
@@ -46,7 +47,7 @@ class MultiPartitionPager implements QueryPager
     private int remaining;
     private int current;
 
-    MultiPartitionPager(List<ReadCommand> commands, ConsistencyLevel 
consistencyLevel, boolean localQuery, PagingState state)
+    MultiPartitionPager(List<ReadCommand> commands, ConsistencyLevel 
consistencyLevel, ClientState cState, boolean localQuery, PagingState state)
     {
         int i = 0;
         // If it's not the beginning (state != null), we need to find where we 
were and skip previous commands
@@ -65,7 +66,7 @@ class MultiPartitionPager implements QueryPager
 
         pagers = new SinglePartitionPager[commands.size() - i];
         // 'i' is on the first non exhausted pager for the previous page (or 
the first one)
-        pagers[0] = makePager(commands.get(i), consistencyLevel, localQuery, 
state);
+        pagers[0] = makePager(commands.get(i), consistencyLevel, cState, 
localQuery, state);
         timestamp = commands.get(i).timestamp;
 
         // Following ones haven't been started yet
@@ -74,16 +75,16 @@ class MultiPartitionPager implements QueryPager
             ReadCommand command = commands.get(j);
             if (command.timestamp != timestamp)
                 throw new IllegalArgumentException("All commands must have the 
same timestamp or weird results may happen.");
-            pagers[j - i] = makePager(command, consistencyLevel, localQuery, 
null);
+            pagers[j - i] = makePager(command, consistencyLevel, cState, 
localQuery, null);
         }
         remaining = state == null ? computeRemaining(pagers) : state.remaining;
     }
 
-    private static SinglePartitionPager makePager(ReadCommand command, 
ConsistencyLevel consistencyLevel, boolean localQuery, PagingState state)
+    private static SinglePartitionPager makePager(ReadCommand command, 
ConsistencyLevel consistencyLevel, ClientState cState, boolean localQuery, 
PagingState state)
     {
         return command instanceof SliceFromReadCommand
-             ? new SliceQueryPager((SliceFromReadCommand)command, 
consistencyLevel, localQuery, state)
-             : new NamesQueryPager((SliceByNamesReadCommand)command, 
consistencyLevel, localQuery);
+             ? new SliceQueryPager((SliceFromReadCommand)command, 
consistencyLevel, cState, localQuery, state)
+             : new NamesQueryPager((SliceByNamesReadCommand)command, 
consistencyLevel, cState, localQuery);
     }
 
     private static int computeRemaining(SinglePartitionPager[] pagers)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java 
b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
index 663db22..d03e582 100644
--- a/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnCounter;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
 
 /**
@@ -34,6 +35,7 @@ public class NamesQueryPager implements SinglePartitionPager
 {
     private final SliceByNamesReadCommand command;
     private final ConsistencyLevel consistencyLevel;
+    private final ClientState state;
     private final boolean localQuery;
 
     private volatile boolean queried;
@@ -49,10 +51,11 @@ public class NamesQueryPager implements SinglePartitionPager
      * count every cell individually) and the names filter asks for more than 
pageSize columns.
      */
     // Don't use directly, use QueryPagers method instead
-    NamesQueryPager(SliceByNamesReadCommand command, ConsistencyLevel 
consistencyLevel, boolean localQuery)
+    NamesQueryPager(SliceByNamesReadCommand command, ConsistencyLevel 
consistencyLevel, ClientState state, boolean localQuery)
     {
         this.command = command;
         this.consistencyLevel = consistencyLevel;
+        this.state = state;
         this.localQuery = localQuery;
     }
 
@@ -87,7 +90,7 @@ public class NamesQueryPager implements SinglePartitionPager
         queried = true;
         return localQuery
              ? 
Collections.singletonList(command.getRow(Keyspace.open(command.ksName)))
-             : 
StorageProxy.read(Collections.<ReadCommand>singletonList(command), 
consistencyLevel);
+             : 
StorageProxy.read(Collections.<ReadCommand>singletonList(command), 
consistencyLevel, state);
     }
 
     public int maxRemaining()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java 
b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
index 04702d0..c03e8ec 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.service.ClientState;
 
 /**
  * Static utility methods to create query pagers.
@@ -82,27 +83,27 @@ public class QueryPagers
         }
     }
 
-    private static QueryPager pager(ReadCommand command, ConsistencyLevel 
consistencyLevel, boolean local, PagingState state)
+    private static QueryPager pager(ReadCommand command, ConsistencyLevel 
consistencyLevel, ClientState cState, boolean local, PagingState state)
     {
         if (command instanceof SliceByNamesReadCommand)
-            return new NamesQueryPager((SliceByNamesReadCommand)command, 
consistencyLevel, local);
+            return new NamesQueryPager((SliceByNamesReadCommand)command, 
consistencyLevel, cState, local);
         else
-            return new SliceQueryPager((SliceFromReadCommand)command, 
consistencyLevel, local, state);
+            return new SliceQueryPager((SliceFromReadCommand)command, 
consistencyLevel, cState, local, state);
     }
 
-    private static QueryPager pager(Pageable command, ConsistencyLevel 
consistencyLevel, boolean local, PagingState state)
+    private static QueryPager pager(Pageable command, ConsistencyLevel 
consistencyLevel, ClientState cState, boolean local, PagingState state)
     {
         if (command instanceof Pageable.ReadCommands)
         {
             List<ReadCommand> commands = 
((Pageable.ReadCommands)command).commands;
             if (commands.size() == 1)
-                return pager(commands.get(0), consistencyLevel, local, state);
+                return pager(commands.get(0), consistencyLevel, cState, local, 
state);
 
-            return new MultiPartitionPager(commands, consistencyLevel, local, 
state);
+            return new MultiPartitionPager(commands, consistencyLevel, cState, 
local, state);
         }
         else if (command instanceof ReadCommand)
         {
-            return pager((ReadCommand)command, consistencyLevel, local, state);
+            return pager((ReadCommand)command, consistencyLevel, cState, 
local, state);
         }
         else
         {
@@ -115,19 +116,19 @@ public class QueryPagers
         }
     }
 
-    public static QueryPager pager(Pageable command, ConsistencyLevel 
consistencyLevel)
+    public static QueryPager pager(Pageable command, ConsistencyLevel 
consistencyLevel, ClientState cState)
     {
-        return pager(command, consistencyLevel, false, null);
+        return pager(command, consistencyLevel, cState, false, null);
     }
 
-    public static QueryPager pager(Pageable command, ConsistencyLevel 
consistencyLevel, PagingState state)
+    public static QueryPager pager(Pageable command, ConsistencyLevel 
consistencyLevel, ClientState cState, PagingState state)
     {
-        return pager(command, consistencyLevel, false, state);
+        return pager(command, consistencyLevel, cState, false, state);
     }
 
     public static QueryPager localPager(Pageable command)
     {
-        return pager(command, null, true, null);
+        return pager(command, null, null, true, null);
     }
 
     /**
@@ -137,7 +138,7 @@ public class QueryPagers
     public static Iterator<ColumnFamily> pageRowLocally(final 
ColumnFamilyStore cfs, ByteBuffer key, final int pageSize)
     {
         SliceFromReadCommand command = new 
SliceFromReadCommand(cfs.metadata.ksName, key, cfs.name, 
System.currentTimeMillis(), new IdentityQueryFilter());
-        final SliceQueryPager pager = new SliceQueryPager(command, null, true);
+        final SliceQueryPager pager = new SliceQueryPager(command, null, null, 
true);
 
         return new Iterator<ColumnFamily>()
         {
@@ -176,11 +177,12 @@ public class QueryPagers
                                  ByteBuffer key,
                                  SliceQueryFilter filter,
                                  ConsistencyLevel consistencyLevel,
+                                 ClientState cState,
                                  final int pageSize,
                                  long now) throws RequestValidationException, 
RequestExecutionException
     {
         SliceFromReadCommand command = new SliceFromReadCommand(keyspace, key, 
columnFamily, now, filter);
-        final SliceQueryPager pager = new SliceQueryPager(command, 
consistencyLevel, false);
+        final SliceQueryPager pager = new SliceQueryPager(command, 
consistencyLevel, cState, false);
 
         ColumnCounter counter = 
filter.columnCounter(Schema.instance.getCFMetaData(keyspace, 
columnFamily).comparator, now);
         while (!pager.isExhausted())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java 
b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
index cd1caf3..05c05b1 100644
--- a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,19 +40,21 @@ public class SliceQueryPager extends AbstractQueryPager 
implements SinglePartiti
     private static final Logger logger = 
LoggerFactory.getLogger(SliceQueryPager.class);
 
     private final SliceFromReadCommand command;
+    private final ClientState cstate;
 
     private volatile Composite lastReturned;
 
     // Don't use directly, use QueryPagers method instead
-    SliceQueryPager(SliceFromReadCommand command, ConsistencyLevel 
consistencyLevel, boolean localQuery)
+    SliceQueryPager(SliceFromReadCommand command, ConsistencyLevel 
consistencyLevel, ClientState cstate, boolean localQuery)
     {
         super(consistencyLevel, command.filter.count, localQuery, 
command.ksName, command.cfName, command.filter, command.timestamp);
         this.command = command;
+        this.cstate = cstate;
     }
 
-    SliceQueryPager(SliceFromReadCommand command, ConsistencyLevel 
consistencyLevel, boolean localQuery, PagingState state)
+    SliceQueryPager(SliceFromReadCommand command, ConsistencyLevel 
consistencyLevel, ClientState cstate, boolean localQuery, PagingState state)
     {
-        this(command, consistencyLevel, localQuery);
+        this(command, consistencyLevel, cstate, localQuery);
 
         if (state != null)
         {
@@ -86,7 +89,7 @@ public class SliceQueryPager extends AbstractQueryPager 
implements SinglePartiti
         ReadCommand pageCmd = command.withUpdatedFilter(filter);
         return localQuery
              ? 
Collections.singletonList(pageCmd.getRow(Keyspace.open(command.ksName)))
-             : StorageProxy.read(Collections.singletonList(pageCmd), 
consistencyLevel);
+             : StorageProxy.read(Collections.singletonList(pageCmd), 
consistencyLevel, cstate);
     }
 
     protected boolean containsPreviousLast(Row first)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java 
b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index e7708df..6955e64 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -103,7 +103,7 @@ public class CassandraServer implements Cassandra.Iface
         return ThriftSessionManager.instance.currentSession();
     }
 
-    protected Map<DecoratedKey, ColumnFamily> 
readColumnFamily(List<ReadCommand> commands, 
org.apache.cassandra.db.ConsistencyLevel consistency_level)
+    protected Map<DecoratedKey, ColumnFamily> 
readColumnFamily(List<ReadCommand> commands, 
org.apache.cassandra.db.ConsistencyLevel consistency_level, ClientState cState)
     throws org.apache.cassandra.exceptions.InvalidRequestException, 
UnavailableException, TimedOutException
     {
         // TODO - Support multiple column families per row, right now row only 
contains 1 column family
@@ -115,7 +115,7 @@ public class CassandraServer implements Cassandra.Iface
             schedule(DatabaseDescriptor.getReadRpcTimeout());
             try
             {
-                rows = StorageProxy.read(commands, consistency_level);
+                rows = StorageProxy.read(commands, consistency_level, cState);
             }
             finally
             {
@@ -269,10 +269,10 @@ public class CassandraServer implements Cassandra.Iface
         return thriftSuperColumns;
     }
 
-    private Map<ByteBuffer, List<ColumnOrSuperColumn>> 
getSlice(List<ReadCommand> commands, boolean subColumnsOnly, 
org.apache.cassandra.db.ConsistencyLevel consistency_level)
+    private Map<ByteBuffer, List<ColumnOrSuperColumn>> 
getSlice(List<ReadCommand> commands, boolean subColumnsOnly, 
org.apache.cassandra.db.ConsistencyLevel consistency_level, ClientState cState)
     throws org.apache.cassandra.exceptions.InvalidRequestException, 
UnavailableException, TimedOutException
     {
-        Map<DecoratedKey, ColumnFamily> columnFamilies = 
readColumnFamily(commands, consistency_level);
+        Map<DecoratedKey, ColumnFamily> columnFamilies = 
readColumnFamily(commands, consistency_level, cState);
         Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = new 
HashMap<ByteBuffer, List<ColumnOrSuperColumn>>();
         for (ReadCommand command: commands)
         {
@@ -322,7 +322,7 @@ public class CassandraServer implements Cassandra.Iface
             ClientState cState = state();
             String keyspace = cState.getKeyspace();
             state().hasColumnFamilyAccess(keyspace, 
column_parent.column_family, Permission.SELECT);
-            return getSliceInternal(keyspace, key, column_parent, 
System.currentTimeMillis(), predicate, consistency_level);
+            return getSliceInternal(keyspace, key, column_parent, 
System.currentTimeMillis(), predicate, consistency_level, cState);
         }
         catch (RequestValidationException e)
         {
@@ -339,10 +339,11 @@ public class CassandraServer implements Cassandra.Iface
                                                        ColumnParent 
column_parent,
                                                        long timestamp,
                                                        SlicePredicate 
predicate,
-                                                       ConsistencyLevel 
consistency_level)
+                                                       ConsistencyLevel 
consistency_level,
+                                                       ClientState cState)
     throws org.apache.cassandra.exceptions.InvalidRequestException, 
UnavailableException, TimedOutException
     {
-        return multigetSliceInternal(keyspace, Collections.singletonList(key), 
column_parent, timestamp, predicate, consistency_level).get(key);
+        return multigetSliceInternal(keyspace, Collections.singletonList(key), 
column_parent, timestamp, predicate, consistency_level, cState).get(key);
     }
 
     public Map<ByteBuffer, List<ColumnOrSuperColumn>> 
multiget_slice(List<ByteBuffer> keys, ColumnParent column_parent, 
SlicePredicate predicate, ConsistencyLevel consistency_level)
@@ -369,7 +370,7 @@ public class CassandraServer implements Cassandra.Iface
             ClientState cState = state();
             String keyspace = cState.getKeyspace();
             cState.hasColumnFamilyAccess(keyspace, 
column_parent.column_family, Permission.SELECT);
-            return multigetSliceInternal(keyspace, keys, column_parent, 
System.currentTimeMillis(), predicate, consistency_level);
+            return multigetSliceInternal(keyspace, keys, column_parent, 
System.currentTimeMillis(), predicate, consistency_level, cState);
         }
         catch (RequestValidationException e)
         {
@@ -431,7 +432,8 @@ public class CassandraServer implements Cassandra.Iface
                                                                              
ColumnParent column_parent,
                                                                              
long timestamp,
                                                                              
SlicePredicate predicate,
-                                                                             
ConsistencyLevel consistency_level)
+                                                                             
ConsistencyLevel consistency_level,
+                                                                             
ClientState cState)
     throws org.apache.cassandra.exceptions.InvalidRequestException, 
UnavailableException, TimedOutException
     {
         CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, 
column_parent.column_family);
@@ -452,7 +454,7 @@ public class CassandraServer implements Cassandra.Iface
             commands.add(ReadCommand.create(keyspace, key, 
column_parent.getColumn_family(), timestamp, filter.cloneShallow()));
         }
 
-        return getSlice(commands, column_parent.isSetSuper_column(), 
consistencyLevel);
+        return getSlice(commands, column_parent.isSetSuper_column(), 
consistencyLevel, cState);
     }
 
     public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath column_path, 
ConsistencyLevel consistency_level)
@@ -501,7 +503,7 @@ public class CassandraServer implements Cassandra.Iface
             long now = System.currentTimeMillis();
             ReadCommand command = ReadCommand.create(keyspace, key, 
column_path.column_family, now, filter);
 
-            Map<DecoratedKey, ColumnFamily> cfamilies = 
readColumnFamily(Arrays.asList(command), consistencyLevel);
+            Map<DecoratedKey, ColumnFamily> cfamilies = 
readColumnFamily(Arrays.asList(command), consistencyLevel, cState);
 
             ColumnFamily cf = 
cfamilies.get(StorageService.getPartitioner().decorateKey(command.key));
 
@@ -549,7 +551,7 @@ public class CassandraServer implements Cassandra.Iface
             long timestamp = System.currentTimeMillis();
 
             if (predicate.column_names != null)
-                return getSliceInternal(keyspace, key, column_parent, 
timestamp, predicate, consistency_level).size();
+                return getSliceInternal(keyspace, key, column_parent, 
timestamp, predicate, consistency_level, cState).size();
 
             int pageSize;
             // request by page if this is a large row
@@ -575,6 +577,7 @@ public class CassandraServer implements Cassandra.Iface
                                           key,
                                           filter,
                                           
ThriftConversion.fromThrift(consistency_level),
+                                          cState,
                                           pageSize,
                                           timestamp);
         }
@@ -637,7 +640,8 @@ public class CassandraServer implements Cassandra.Iface
                                                                                
                  column_parent,
                                                                                
                  System.currentTimeMillis(),
                                                                                
                  predicate,
-                                                                               
                  consistency_level);
+                                                                               
                  consistency_level,
+                                                                               
                  cState);
 
             for (Map.Entry<ByteBuffer, List<ColumnOrSuperColumn>> cf : 
columnFamiliesMap.entrySet())
                 counts.put(cf.getKey(), cf.getValue().size());
@@ -787,7 +791,8 @@ public class CassandraServer implements Cassandra.Iface
                                                    key,
                                                    new 
ThriftCASRequest(cfExpected, cfUpdates),
                                                    
ThriftConversion.fromThrift(serial_consistency_level),
-                                                   
ThriftConversion.fromThrift(commit_consistency_level));
+                                                   
ThriftConversion.fromThrift(commit_consistency_level),
+                                                   cState);
             return result == null
                  ? new CASResult(true)
                  : new 
CASResult(false).setCurrent_values(thriftifyColumnsAsColumns(result.getSortedColumns(),
 System.currentTimeMillis()));
@@ -2083,7 +2088,7 @@ public class CassandraServer implements Cassandra.Iface
             SliceQueryFilter filter = new SliceQueryFilter(deoverlapped, 
request.reversed, request.count);
             ThriftValidation.validateKey(metadata, request.key);
             commands.add(ReadCommand.create(keyspace, request.key, 
request.column_parent.getColumn_family(), System.currentTimeMillis(), filter));
-            return getSlice(commands, 
request.column_parent.isSetSuper_column(), 
consistencyLevel).entrySet().iterator().next().getValue();
+            return getSlice(commands, 
request.column_parent.isSetSuper_column(), consistencyLevel, 
cState).entrySet().iterator().next().getValue();
         }
         catch (RequestValidationException e)
         {

Reply via email to