aratno commented on code in PR #4145:
URL: https://github.com/apache/cassandra/pull/4145#discussion_r2079972514


##########
test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java:
##########
@@ -82,56 +77,44 @@ public void testBasicWriteForwarding() throws Throwable
                                         "AND replication_type='tracked';", 
keyspaceName));
             cluster.schemaChange(format("CREATE TABLE %s.%s (k int, c int, v 
int, primary key (k, c));", keyspaceName, tableName));
 
-            Map<IInstance, Integer> instanceUnreconciled = new HashMap<>();
             int ROWS = 100;
             for (int inserted = 0; inserted < ROWS; inserted++)
             {
                 // Writes should be completed for the client, regardless of 
whether they are forwarded or not
                 cluster.coordinator(inst(inserted)).execute(format("INSERT 
INTO %s.%s (k, c, v) VALUES (?, ?, ?)", keyspaceName, tableName), 
ConsistencyLevel.ALL, inserted, inserted, inserted);
+            }
 
-                // Writes should be ack'd in the journal too, but these could 
lag behind client acks, so could be
-                // permissive here. Each write should be reconciled on 1 
leader, unreconciled on TOTAL_RF-1
-                // replicas (until background reconciliation broadcast is 
implemented), and ignored on others.
-                Set<IInvokableInstance> leaderOrNonReplica = new HashSet<>(2);
-                Set<IInvokableInstance> replicas = new HashSet<>();
-                for (IInvokableInstance instance : cluster)
-                {
-                    int unreconciled = instance.callOnInstance(() -> {
-                        Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
-                        Range<Token> fullRange = new Range<>(token, token);
-                        TableId tableId = 
Schema.instance.getTableMetadata(keyspaceName, tableName).id;
-                        MutationSummary summary = 
MutationTrackingService.instance.createSummaryForRange(fullRange, tableId, 
true);
-                        return summary.unreconciledIds();
-                    });
-                    int lastUnreconciled = 
instanceUnreconciled.getOrDefault(instance, 0);
-                    int newUnreconciled = unreconciled - lastUnreconciled;
+            Thread.sleep(1000); // allow time for all offsets to be broadcasted
 
-                    if (newUnreconciled == 0)
-                    {
-                        // instance already reconciled (as leader) or did not 
receive new mutation ID (non-replica)
-                        leaderOrNonReplica.add(instance);
-                    }
-                    else if (newUnreconciled == 1)
-                    {
-                        // instance has not reconciled, so it's a replica 
(until reconciliation broadcast is implemented)
-                        replicas.add(instance);
-                    }
-                    else
-                    {
-                        Assertions.fail("Should not have more than one new 
unreconciled mutation");
-                    }
-                    instanceUnreconciled.put(instance, unreconciled);
-                }
-                Assertions.assertThat(leaderOrNonReplica).hasSize(2);
-                Assertions.assertThat(replicas).hasSize(TOTAL_RF - 1);
+            int allReconciled = 0;
+            int allUnreconciled = 0;
+
+            // Writes should be ack'd in the journal too, but these could lag 
behind client acks, so could be
+            // permissive here. Each write should be reconciled everywhere.
+            for (IInvokableInstance instance : cluster)
+            {
+                int reconciled = instance.callOnInstance(() -> {
+                    Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+                    Range<Token> fullRange = new Range<>(token, token);
+                    TableId tableId = 
Schema.instance.getTableMetadata(keyspaceName, tableName).id;
+                    MutationSummary summary = 
MutationTrackingService.instance.createSummaryForRange(fullRange, tableId, 
true);
+                    return summary.reconciledIds();

Review Comment:
   Note for later - would be nice to have metrics to differentiate writes that 
are reconciled locally on the leader, from in-band write acknowledgements on 
replicas, or from broadcast.



##########
test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java:
##########
@@ -82,56 +77,44 @@ public void testBasicWriteForwarding() throws Throwable
                                         "AND replication_type='tracked';", 
keyspaceName));
             cluster.schemaChange(format("CREATE TABLE %s.%s (k int, c int, v 
int, primary key (k, c));", keyspaceName, tableName));
 
-            Map<IInstance, Integer> instanceUnreconciled = new HashMap<>();
             int ROWS = 100;
             for (int inserted = 0; inserted < ROWS; inserted++)
             {
                 // Writes should be completed for the client, regardless of 
whether they are forwarded or not
                 cluster.coordinator(inst(inserted)).execute(format("INSERT 
INTO %s.%s (k, c, v) VALUES (?, ?, ?)", keyspaceName, tableName), 
ConsistencyLevel.ALL, inserted, inserted, inserted);
+            }
 
-                // Writes should be ack'd in the journal too, but these could 
lag behind client acks, so could be
-                // permissive here. Each write should be reconciled on 1 
leader, unreconciled on TOTAL_RF-1
-                // replicas (until background reconciliation broadcast is 
implemented), and ignored on others.
-                Set<IInvokableInstance> leaderOrNonReplica = new HashSet<>(2);
-                Set<IInvokableInstance> replicas = new HashSet<>();
-                for (IInvokableInstance instance : cluster)
-                {
-                    int unreconciled = instance.callOnInstance(() -> {
-                        Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
-                        Range<Token> fullRange = new Range<>(token, token);
-                        TableId tableId = 
Schema.instance.getTableMetadata(keyspaceName, tableName).id;
-                        MutationSummary summary = 
MutationTrackingService.instance.createSummaryForRange(fullRange, tableId, 
true);
-                        return summary.unreconciledIds();
-                    });
-                    int lastUnreconciled = 
instanceUnreconciled.getOrDefault(instance, 0);
-                    int newUnreconciled = unreconciled - lastUnreconciled;
+            Thread.sleep(1000); // allow time for all offsets to be broadcasted

Review Comment:
   Fine to merge as-is, but could replace this with a signal from each instance 
that a full cycle of broadcast has been received, via a BlockingQueue or 
semaphore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to