Updated Branches: refs/heads/trunk fe784f58e -> 5577ff626
run local range scans on the read stage patch by jbellis; reviewed by vijay for CASSANDRA-3687 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5577ff62 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5577ff62 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5577ff62 Branch: refs/heads/trunk Commit: 5577ff626bb38d419a3540e0c0ccb1a9d8b8680f Parents: 29fed1f Author: Jonathan Ellis <jbel...@apache.org> Authored: Thu Aug 16 15:43:02 2012 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Thu Aug 16 15:43:02 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/service/AbstractRowResolver.java | 11 -- .../org/apache/cassandra/service/ReadCallback.java | 27 ++--- .../org/apache/cassandra/service/StorageProxy.java | 91 ++++++++------- 4 files changed, 59 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5577ff62/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5a2848d..75de54e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2-dev + * run local range scans on the read stage (CASSANDRA-3687) * clean up ioexceptions (CASSANDRA-2116) * Introduce new json format with row level deletion (CASSANDRA-4054) * remove redundant "name" column from schema_keyspaces (CASSANDRA-4433) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5577ff62/src/java/org/apache/cassandra/service/AbstractRowResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractRowResolver.java b/src/java/org/apache/cassandra/service/AbstractRowResolver.java index b1647a2..beaf73c 100644 --- a/src/java/org/apache/cassandra/service/AbstractRowResolver.java +++ b/src/java/org/apache/cassandra/service/AbstractRowResolver.java @@ -51,17 +51,6 @@ public abstract class AbstractRowResolver implements IResponseResolver<ReadRespo replies.add(message); } - /** hack so local reads don't force de/serialization of an extra real Message */ - public void injectPreProcessed(ReadResponse result) - { - MessageIn<ReadResponse> message = MessageIn.create(FBUtilities.getBroadcastAddress(), - result, - Collections.<String, byte[]>emptyMap(), - MessagingService.Verb.INTERNAL_RESPONSE, - MessagingService.current_version); - replies.add(message); - } - public Iterable<MessageIn<ReadResponse>> getMessages() { return replies; http://git-wip-us.apache.org/repos/asf/cassandra/blob/5577ff62/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index a3d273c..bfd0044 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -19,6 +19,7 @@ package org.apache.cassandra.service; import java.io.IOException; import java.net.InetAddress; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -165,32 +166,20 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag /** * @return true if the message counts towards the blockfor threshold - * TODO turn the Message into a response so we don't need two versions of this method */ protected boolean waitingFor(MessageIn message) { return true; } - /** - * @return true if the response counts towards the blockfor threshold - */ - protected boolean waitingFor(ReadResponse response) + public void response(TMessage result) { - return true; - } - - public void response(ReadResponse result) - { - ((RowDigestResolver) resolver).injectPreProcessed(result); - int n = waitingFor(result) - ? received.incrementAndGet() - : received.get(); - if (n >= blockfor && resolver.isDataPresent()) - { - condition.signal(); - maybeResolveForRepair(); - } + MessageIn<TMessage> message = MessageIn.create(FBUtilities.getBroadcastAddress(), + result, + Collections.<String, byte[]>emptyMap(), + MessagingService.Verb.INTERNAL_RESPONSE, + MessagingService.current_version); + response(message); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/5577ff62/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 9d55739..1fb84cd 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -831,6 +831,30 @@ public class StorageProxy implements StorageProxyMBean } } + static class LocalRangeSliceRunnable extends DroppableRunnable + { + private final RangeSliceCommand command; + private final ReadCallback<RangeSliceReply, Iterable<Row>> handler; + private final long start = System.currentTimeMillis(); + + LocalRangeSliceRunnable(RangeSliceCommand command, ReadCallback<RangeSliceReply, Iterable<Row>> handler) + { + super(MessagingService.Verb.READ); + this.command = command; + this.handler = handler; + } + + protected void runMayThrow() throws ExecutionException, InterruptedException + { + if (logger.isDebugEnabled()) + logger.debug("LocalReadRunnable reading " + command); + + RangeSliceReply result = new RangeSliceReply(RangeSliceVerbHandler.executeLocally(command)); + MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), System.currentTimeMillis() - start); + handler.response(result); + } + } + static <TMessage, TResolved> ReadCallback<TMessage, TResolved> getReadCallback(IResponseResolver<TMessage, TResolved> resolver, IReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> endpoints) { if (consistencyLevel == ConsistencyLevel.LOCAL_QUORUM || consistencyLevel == ConsistencyLevel.EACH_QUORUM) @@ -868,33 +892,18 @@ public class StorageProxy implements StorageProxyMBean List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(nodeCmd.keyspace, range.right); DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints); - if (consistency_level == ConsistencyLevel.ONE && !liveEndpoints.isEmpty() && liveEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())) + // collect replies and resolve according to consistency level + RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace); + ReadCallback<RangeSliceReply, Iterable<Row>> handler = getReadCallback(resolver, nodeCmd, consistency_level, liveEndpoints); + handler.assureSufficientLiveNodes(); + resolver.setSources(handler.endpoints); + if (handler.endpoints.size() == 1 && handler.endpoints.get(0).equals(FBUtilities.getBroadcastAddress())) { - if (logger.isDebugEnabled()) - logger.debug("local range slice"); - - try - { - rows.addAll(RangeSliceVerbHandler.executeLocally(nodeCmd)); - for (Row row : rows) - columnsCount += row.getLiveColumnCount(); - } - catch (ExecutionException e) - { - throw new RuntimeException(e.getCause()); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } + logger.debug("reading data locally"); + StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler)); } else { - // collect replies and resolve according to consistency level - RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace); - ReadCallback<RangeSliceReply, Iterable<Row>> handler = getReadCallback(resolver, nodeCmd, consistency_level, liveEndpoints); - handler.assureSufficientLiveNodes(); - resolver.setSources(handler.endpoints); MessageOut<RangeSliceCommand> message = nodeCmd.createMessage(); for (InetAddress endpoint : handler.endpoints) { @@ -902,27 +911,27 @@ public class StorageProxy implements StorageProxyMBean if (logger.isDebugEnabled()) logger.debug("reading " + nodeCmd + " from " + endpoint); } + } - try - { - for (Row row : handler.get()) - { - rows.add(row); - columnsCount += row.getLiveColumnCount(); - logger.debug("range slices read {}", row.key); - } - FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout()); - } - catch (TimeoutException ex) - { - if (logger.isDebugEnabled()) - logger.debug("Range slice timeout: {}", ex.toString()); - throw ex; - } - catch (DigestMismatchException e) + try + { + for (Row row : handler.get()) { - throw new AssertionError(e); // no digests in range slices yet + rows.add(row); + columnsCount += row.getLiveColumnCount(); + logger.debug("range slices read {}", row.key); } + FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout()); + } + catch (TimeoutException ex) + { + if (logger.isDebugEnabled()) + logger.debug("Range slice timeout: {}", ex.toString()); + throw ex; + } + catch (DigestMismatchException e) + { + throw new AssertionError(e); // no digests in range slices yet } // if we're done, great, otherwise, move to the next range