http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2445cfb/src/java/org/apache/cassandra/service/DataResolver.java
--
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 60cfbba,000..c96a893
mode 100644,00..100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -1,498 -1,0 +1,498 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.MoreRows;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class DataResolver extends ResponseResolver
+{
+@VisibleForTesting
+final List repairResults =
Collections.synchronizedList(new ArrayList<>());
+
+public DataResolver(Keyspace keyspace, ReadCommand command,
ConsistencyLevel consistency, int maxResponseCount)
+{
+super(keyspace, command, consistency, maxResponseCount);
+}
+
+public PartitionIterator getData()
+{
+ReadResponse response = responses.iterator().next().payload;
+return
UnfilteredPartitionIterators.filter(response.makeIterator(command),
command.nowInSec());
+}
+
+public PartitionIterator resolve()
+{
+// We could get more responses while this method runs, which is ok
(we're happy to ignore any response not here
+// at the beginning of this method), so grab the response count once
and use that through the method.
+int count = responses.size();
+List iters = new ArrayList<>(count);
+InetAddress[] sources = new InetAddress[count];
+for (int i = 0; i < count; i++)
+{
+MessageIn msg = responses.get(i);
+iters.add(msg.payload.makeIterator(command));
+sources[i] = msg.from;
+}
+
+// Even though every responses should honor the limit, we might have
more than requested post reconciliation,
+// so ensure we're respecting the limit.
- DataLimits.Counter counter =
command.limits().newCounter(command.nowInSec(), true);
++DataLimits.Counter counter =
command.limits().newCounter(command.nowInSec(), true,
command.selectsFullPartition());
+return counter.applyTo(mergeWithShortReadProtection(iters, sources,
counter));
+}
+
+public void compareResponses()
+{
+// We need to fully consume the results to trigger read repairs if
appropriate
+try (PartitionIterator iterator = resolve())
+{
+PartitionIterators.consume(iterator);
+}
+}
+
+private PartitionIterator
mergeWithShortReadProtection(List results,
InetAddress[] sources, DataLimits.Counter resultCounter)
+{
+// If we have only one results, there is no read repair to do and we
can't get short reads
+if (results.size() == 1)
+return UnfilteredPartitionIterators.filter(results.get(0),
command.nowInSec());
+
+UnfilteredPartitionIterators.MergeListener listener = new
RepairMergeListener(sources);
+
+// So-called "short reads" stems from nodes returning only a subset
of the results they have for a partition due to the limit,
+// but that subs