Repository: cassandra
Updated Branches:
  refs/heads/trunk c5a7fcaa8 -> eda2613bc


Fix some regressions caused by 14058

Patch by Blake Eggleston; Reviewed by Aleksey Yeschenko for CASSANDRA-14353


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eda2613b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eda2613b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eda2613b

Branch: refs/heads/trunk
Commit: eda2613bcca7e4734b3bb4e04c6a4df39eb59ca2
Parents: c5a7fca
Author: Blake Eggleston <bdeggles...@gmail.com>
Authored: Wed Mar 28 15:03:52 2018 -0700
Committer: Blake Eggleston <bdeggles...@gmail.com>
Committed: Fri Apr 6 16:20:47 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/service/reads/DataResolver.java   | 114 +++++++++++++++++--
 .../reads/repair/BlockingReadRepair.java        |  68 +++++++++--
 .../service/reads/repair/NoopReadRepair.java    |   1 -
 .../repair/PartitionIteratorMergeListener.java  |   5 -
 .../service/reads/repair/ReadRepair.java        |   1 +
 .../reads/repair/RowIteratorMergeListener.java  |  30 -----
 .../reads/repair/TestableReadRepair.java        |   1 -
 8 files changed, 165 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/eda2613b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3db7f27..d6cf162 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Fix some regressions caused by 14058 (CASSANDRA-14353)
  * Abstract repair for pluggable storage (CASSANDRA-14116)
  * Add meaningful toString() impls (CASSANDRA-13653)
  * Add sstableloader option to accept target keyspace name (CASSANDRA-13884)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eda2613b/src/java/org/apache/cassandra/service/reads/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java 
b/src/java/org/apache/cassandra/service/reads/DataResolver.java
index 11dd083..4c7a6c9 100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@ -17,12 +17,17 @@
  */
 package org.apache.cassandra.service.reads;
 
-import java.net.InetAddress;
 import java.util.*;
 
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
 
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.reads.repair.ReadRepair;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.*;
@@ -33,11 +38,6 @@ import org.apache.cassandra.tracing.TraceState;
 
 public class DataResolver extends ResponseResolver
 {
-    private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS =
-        Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
-
-    @VisibleForTesting
-    final List<AsyncOneResponse> repairResults = 
Collections.synchronizedList(new ArrayList<>());
     private final long queryStartNanoTime;
     private final boolean enforceStrictLiveness;
 
@@ -114,7 +114,7 @@ public class DataResolver extends ResponseResolver
                 results.set(i, ShortReadProtection.extend(sources[i], 
results.get(i), command, mergedResultCounter, queryStartNanoTime, 
enforceStrictLiveness));
             }
 
-        return UnfilteredPartitionIterators.merge(results, command.nowInSec(), 
readRepair.getMergeListener(sources));
+        return UnfilteredPartitionIterators.merge(results, command.nowInSec(), 
wrapMergeListener(readRepair.getMergeListener(sources), sources));
     }
 
     public void evaluateAllResponses()
@@ -130,4 +130,102 @@ public class DataResolver extends ResponseResolver
     {
         evaluateAllResponses();
     }
+
+    private String makeResponsesDebugString(DecoratedKey partitionKey)
+    {
+        return Joiner.on(",\n").join(Iterables.transform(getMessages(), m -> 
m.from + " => " + m.payload.toDebugString(command, partitionKey)));
+    }
+
+    private UnfilteredPartitionIterators.MergeListener 
wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, 
InetAddressAndPort[] sources)
+    {
+        return new UnfilteredPartitionIterators.MergeListener()
+        {
+            public UnfilteredRowIterators.MergeListener 
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> 
versions)
+            {
+                UnfilteredRowIterators.MergeListener rowListener = 
partitionListener.getRowMergeListener(partitionKey, versions);
+
+                return new UnfilteredRowIterators.MergeListener()
+                {
+                    public void onMergedPartitionLevelDeletion(DeletionTime 
mergedDeletion, DeletionTime[] versions)
+                    {
+                        try
+                        {
+                            
rowListener.onMergedPartitionLevelDeletion(mergedDeletion, versions);
+                        }
+                        catch (AssertionError e)
+                        {
+                            // The following can be pretty verbose, but it's 
really only triggered if a bug happen, so we'd
+                            // rather get more info to debug than not.
+                            TableMetadata table = command.metadata();
+                            String details = String.format("Error merging 
partition level deletion on %s: merged=%s, versions=%s, sources={%s}, debug 
info:%n %s",
+                                                           table,
+                                                           mergedDeletion == 
null ? "null" : mergedDeletion.toString(),
+                                                           '[' + Joiner.on(", 
").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" 
: rt.toString())) + ']',
+                                                           
Arrays.toString(sources),
+                                                           
makeResponsesDebugString(partitionKey));
+                            throw new AssertionError(details, e);
+                        }
+                    }
+
+                    public void onMergedRows(Row merged, Row[] versions)
+                    {
+                        try
+                        {
+                            rowListener.onMergedRows(merged, versions);
+                        }
+                        catch (AssertionError e)
+                        {
+                            // The following can be pretty verbose, but it's 
really only triggered if a bug happen, so we'd
+                            // rather get more info to debug than not.
+                            TableMetadata table = command.metadata();
+                            String details = String.format("Error merging rows 
on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
+                                                           table,
+                                                           merged == null ? 
"null" : merged.toString(table),
+                                                           '[' + Joiner.on(", 
").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" 
: rt.toString(table))) + ']',
+                                                           
Arrays.toString(sources),
+                                                           
makeResponsesDebugString(partitionKey));
+                            throw new AssertionError(details, e);
+                        }
+                    }
+
+                    public void 
onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, 
RangeTombstoneMarker[] versions)
+                    {
+                        try
+                        {
+                            // The code for merging range tombstones is a tad 
complex and we had the assertions there triggered
+                            // unexpectedly in a few occasions 
(CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights
+                            // when that happen without more context that what 
the assertion errors give us however, hence the
+                            // catch here that basically gather as much as 
context as reasonable.
+                            rowListener.onMergedRangeTombstoneMarkers(merged, 
versions);
+                        }
+                        catch (AssertionError e)
+                        {
+
+                            // The following can be pretty verbose, but it's 
really only triggered if a bug happen, so we'd
+                            // rather get more info to debug than not.
+                            TableMetadata table = command.metadata();
+                            String details = String.format("Error merging RTs 
on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
+                                                           table,
+                                                           merged == null ? 
"null" : merged.toString(table),
+                                                           '[' + Joiner.on(", 
").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" 
: rt.toString(table))) + ']',
+                                                           
Arrays.toString(sources),
+                                                           
makeResponsesDebugString(partitionKey));
+                            throw new AssertionError(details, e);
+                        }
+
+                    }
+
+                    public void close()
+                    {
+                        rowListener.close();
+                    }
+                };
+            }
+
+            public void close()
+            {
+                partitionListener.close();
+            }
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eda2613b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java 
b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
index 07b6e2c..8689356 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
@@ -38,7 +38,10 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadCommand;
@@ -57,7 +60,6 @@ import org.apache.cassandra.service.reads.ReadCallback;
 import org.apache.cassandra.service.reads.ResponseResolver;
 import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.concurrent.Accumulator;
 
 /**
  * 'Classic' read repair. Doesn't allow the client read to return until
@@ -67,6 +69,9 @@ public class BlockingReadRepair implements ReadRepair, 
RepairListener
 {
     private static final Logger logger = 
LoggerFactory.getLogger(BlockingReadRepair.class);
 
+    private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS =
+        Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
+
     private final ReadCommand command;
     private final List<InetAddressAndPort> endpoints;
     private final long queryStartNanoTime;
@@ -110,24 +115,66 @@ public class BlockingReadRepair implements ReadRepair, 
RepairListener
     {
 
         final List<AsyncOneResponse<?>> responses;
+        final ReadCommand command;
+        final ConsistencyLevel consistency;
 
-        public BlockingPartitionRepair(int expectedResponses)
+        public BlockingPartitionRepair(int expectedResponses, ReadCommand 
command, ConsistencyLevel consistency)
         {
             this.responses = new ArrayList<>(expectedResponses);
+            this.command = command;
+            this.consistency = consistency;
         }
 
-        protected AsyncOneResponse sendMutation(InetAddressAndPort endpoint, 
Mutation mutation)
+        private AsyncOneResponse sendRepairMutation(Mutation mutation, 
InetAddressAndPort destination)
         {
-            // use a separate verb here because we don't want these to be get 
the white glove hint-
-            // on-timeout behavior that a "real" mutation gets
-            Tracing.trace("Sending read-repair-mutation to {}", endpoint);
-            MessageOut<Mutation> msg = 
mutation.createMessage(MessagingService.Verb.READ_REPAIR);
-            return MessagingService.instance().sendRR(msg, endpoint);
+            DecoratedKey key = mutation.key();
+            Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+            int messagingVersion = 
MessagingService.instance().getVersion(destination);
+
+            int    mutationSize = (int) 
Mutation.serializer.serializedSize(mutation, messagingVersion);
+            int maxMutationSize = DatabaseDescriptor.getMaxMutationSize();
+
+            AsyncOneResponse callback = null;
+
+            if (mutationSize <= maxMutationSize)
+            {
+                Tracing.trace("Sending read-repair-mutation to {}", 
destination);
+                // use a separate verb here to avoid writing hints on timeouts
+                MessageOut<Mutation> message = 
mutation.createMessage(MessagingService.Verb.READ_REPAIR);
+                callback = MessagingService.instance().sendRR(message, 
destination);
+                
ColumnFamilyStore.metricsFor(command.metadata().id).readRepairRequests.mark();
+            }
+            else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
+            {
+                logger.debug("Encountered an oversized ({}/{}) read repair 
mutation for table {}, key {}, node {}",
+                             mutationSize,
+                             maxMutationSize,
+                             command.metadata(),
+                             
command.metadata().partitionKeyType.getString(key.getKey()),
+                             destination);
+            }
+            else
+            {
+                logger.warn("Encountered an oversized ({}/{}) read repair 
mutation for table {}, key {}, node {}",
+                            mutationSize,
+                            maxMutationSize,
+                            command.metadata(),
+                            
command.metadata().partitionKeyType.getString(key.getKey()),
+                            destination);
+
+                int blockFor = consistency.blockFor(keyspace);
+                Tracing.trace("Timed out while read-repairing after receiving 
all {} data and digest responses", blockFor);
+                throw new ReadTimeoutException(consistency, blockFor - 1, 
blockFor, true);
+            }
+            return callback;
         }
 
         public void reportMutation(InetAddressAndPort endpoint, Mutation 
mutation)
         {
-            responses.add(sendMutation(endpoint, mutation));
+            AsyncOneResponse<?> response = sendRepairMutation(mutation, 
endpoint);
+
+            if (response != null)
+                responses.add(response);
         }
 
         public void finish()
@@ -169,12 +216,11 @@ public class BlockingReadRepair implements ReadRepair, 
RepairListener
         {
             throw new RuntimeException(e);
         }
-
     }
 
     public PartitionRepair startPartitionRepair()
     {
-        BlockingPartitionRepair repair = new 
BlockingPartitionRepair(endpoints.size());
+        BlockingPartitionRepair repair = new 
BlockingPartitionRepair(endpoints.size(), command, consistency);
         repairs.add(repair);
         return repair;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eda2613b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java 
b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
index ff65dbb..39f5bff 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.service.reads.repair;
 
-import java.net.InetAddress;
 import java.util.List;
 import java.util.function.Consumer;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eda2613b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
 
b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
index 3ad57cf..4ccdcbf 100644
--- 
a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
+++ 
b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
@@ -20,9 +20,6 @@ package org.apache.cassandra.service.reads.repair;
 
 import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Columns;
 import org.apache.cassandra.db.DecoratedKey;
@@ -35,8 +32,6 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 
 public class PartitionIteratorMergeListener implements 
UnfilteredPartitionIterators.MergeListener
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(PartitionIteratorMergeListener.class);
-
     private final InetAddressAndPort[] sources;
     private final ReadCommand command;
     private final RepairListener repairListener;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eda2613b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java 
b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
index bdd730c..21cab20 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.tracing.TraceState;
 
 public interface ReadRepair
 {
+
     /**
      * Used by DataResolver to generate corrections as the partition iterator 
is consumed
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eda2613b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
 
b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
index 44b6eeb..f11d264 100644
--- 
a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
+++ 
b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
@@ -20,9 +20,6 @@ package org.apache.cassandra.service.reads.repair;
 
 import java.util.Arrays;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
-
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.ClusteringBound;
 import org.apache.cassandra.db.DecoratedKey;
@@ -44,7 +41,6 @@ import org.apache.cassandra.db.rows.Rows;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.ColumnMetadata;
-import org.apache.cassandra.schema.TableMetadata;
 
 public class RowIteratorMergeListener implements 
UnfilteredRowIterators.MergeListener
 {
@@ -186,32 +182,6 @@ public class RowIteratorMergeListener implements 
UnfilteredRowIterators.MergeLis
 
     public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, 
RangeTombstoneMarker[] versions)
     {
-        try
-        {
-            // The code for merging range tombstones is a tad complex and we 
had the assertions there triggered
-            // unexpectedly in a few occasions (CASSANDRA-13237, 
CASSANDRA-13719). It's hard to get insights
-            // when that happen without more context that what the assertion 
errors give us however, hence the
-            // catch here that basically gather as much as context as 
reasonable.
-            internalOnMergedRangeTombstoneMarkers(merged, versions);
-        }
-        catch (AssertionError e)
-        {
-            // The following can be pretty verbose, but it's really only 
triggered if a bug happen, so we'd
-            // rather get more info to debug than not.
-            TableMetadata table = command.metadata();
-            String details = String.format("Error merging RTs on %s: 
command=%s, reversed=%b, merged=%s, versions=%s, sources={%s}",
-                                           table,
-                                           command.toCQLString(),
-                                           isReversed,
-                                           merged == null ? "null" : 
merged.toString(table),
-                                           '[' + Joiner.on(", 
").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" 
: rt.toString(table))) + ']',
-                                           Arrays.toString(sources));
-            throw new AssertionError(details, e);
-        }
-    }
-
-    private void internalOnMergedRangeTombstoneMarkers(RangeTombstoneMarker 
merged, RangeTombstoneMarker[] versions)
-    {
         // The current deletion as of dealing with this marker.
         DeletionTime currentDeletion = currentDeletion();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eda2613b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java 
b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
index d125c9d..522e524 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.service.reads.repair;
 
-import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to