bdeggleston commented on code in PR #2981:
URL: https://github.com/apache/cassandra/pull/2981#discussion_r1483492372


##########
src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java:
##########
@@ -61,31 +60,32 @@ public class BlockingPartitionRepair
     private final ReplicaPlan.ForTokenWrite writePlan;
     private final Map<Replica, Mutation> pendingRepairs;
     private final CountDownLatch latch;
-    private final Predicate<InetAddressAndPort> shouldBlockOn;
 
     private volatile long mutationsSentTime;
 
     public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> 
repairs, ReplicaPlan.ForTokenWrite writePlan)
-    {
-        this(key, repairs, writePlan,
-             writePlan.consistencyLevel().isDatacenterLocal() ? 
InOurDcTester.endpoints() : Predicates.alwaysTrue());
-    }
-    public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> 
repairs, ReplicaPlan.ForTokenWrite writePlan, Predicate<InetAddressAndPort> 
shouldBlockOn)
     {
         this.key = key;
         this.pendingRepairs = new ConcurrentHashMap<>(repairs);
         this.writePlan = writePlan;
-        this.shouldBlockOn = shouldBlockOn;
+
+        // make sure all the read repair targets are contact of the repair 
write plan
+        if (!all(repairs.keySet(), (r) -> writePlan.contacts().contains(r)))
+        {
+            throw new AssertionError("All repair targets should be part of 
contacts of read repair write plan.");

Review Comment:
   Can you use `Preconditions.checkState` here, or throw an 
`IllegalStateException`?



##########
src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java:
##########
@@ -61,31 +60,32 @@ public class BlockingPartitionRepair
     private final ReplicaPlan.ForTokenWrite writePlan;
     private final Map<Replica, Mutation> pendingRepairs;
     private final CountDownLatch latch;
-    private final Predicate<InetAddressAndPort> shouldBlockOn;
 
     private volatile long mutationsSentTime;
 
     public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> 
repairs, ReplicaPlan.ForTokenWrite writePlan)
-    {
-        this(key, repairs, writePlan,
-             writePlan.consistencyLevel().isDatacenterLocal() ? 
InOurDcTester.endpoints() : Predicates.alwaysTrue());
-    }
-    public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> 
repairs, ReplicaPlan.ForTokenWrite writePlan, Predicate<InetAddressAndPort> 
shouldBlockOn)
     {
         this.key = key;
         this.pendingRepairs = new ConcurrentHashMap<>(repairs);
         this.writePlan = writePlan;
-        this.shouldBlockOn = shouldBlockOn;
+
+        // make sure all the read repair targets are contact of the repair 
write plan
+        if (!all(repairs.keySet(), (r) -> writePlan.contacts().contains(r)))
+        {
+            throw new AssertionError("All repair targets should be part of 
contacts of read repair write plan.");
+        }
 
         int blockFor = writePlan.blockFor();
         // here we remove empty repair mutations from the block for total, 
since
         // we're not sending them mutations
         for (Replica participant : writePlan.contacts())
         {
-            // remote dcs can sometimes get involved in dc-local reads. We 
want to repair
-            // them if they do, but they shouldn't interfere with blocking the 
client read.
-            if (!repairs.containsKey(participant) && 
shouldBlockOn.test(participant.endpoint()))
+            if (!repairs.containsKey(participant))
                 blockFor--;
+
+            // make sure for local consistency, all contacts are local replicas
+            if (writePlan.consistencyLevel().isDatacenterLocal() && 
!InOurDcTester.replicas().test(participant))
+                throw new AssertionError("Local consistency blocking read 
repair is trying to contact remote DC node: " + participant.endpoint());

Review Comment:
   Can you use `Preconditions.checkState` here, or throw an 
`IllegalStateException`?



##########
src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java:
##########
@@ -205,7 +199,9 @@ public void maybeSendAdditionalWrites(long timeout, 
TimeUnit timeoutUnit)
         if (awaitRepairsUntil(timeout + timeoutUnit.convert(mutationsSentTime, 
TimeUnit.NANOSECONDS), timeoutUnit))
             return;
 
-        EndpointsForToken newCandidates = writePlan.liveUncontacted();
+        EndpointsForToken newCandidates = writePlan.liveUncontacted().filter(

Review Comment:
   can you rework this bit as:
   `writePlan.consistencyLevel().isDatacenterLocal() ? 
writePlane.liveUncontacted().filter(InOutDcTester.replicas()) : 
writePlane.liveUncontacted();` so we're only filtering if we need to?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to