aweisberg commented on code in PR #4569:
URL: https://github.com/apache/cassandra/pull/4569#discussion_r2718105627


##########
src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.replication.MutationTrackingSyncCoordinator;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import 
org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.TimeUUID;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+
+/** Incremental repair task for keyspaces using mutation tracking */
+public class MutationTrackingIncrementalRepairTask extends AbstractRepairTask
+{
+    private static final long SYNC_TIMEOUT_MINUTES = 30;
+
+    private final TimeUUID parentSession;
+    private final RepairCoordinator.NeighborsAndRanges neighborsAndRanges;
+    private final String[] cfnames;
+
+    protected MutationTrackingIncrementalRepairTask(RepairCoordinator 
coordinator,
+                                                    TimeUUID parentSession,
+                                                    
RepairCoordinator.NeighborsAndRanges neighborsAndRanges,
+                                                    String[] cfnames)
+    {
+        super(coordinator);
+        this.parentSession = parentSession;
+        this.neighborsAndRanges = neighborsAndRanges;
+        this.cfnames = cfnames;
+    }
+
+    @Override
+    public String name()
+    {
+        return "MutationTrackingIncrementalRepair";
+    }
+
+    @Override
+    public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus 
executor, Scheduler validationScheduler)
+    {
+        List<CommonRange> allRanges = 
neighborsAndRanges.filterCommonRanges(keyspace, cfnames);
+
+        if (allRanges.isEmpty())
+        {
+            logger.info("No common ranges to repair for keyspace {}", 
keyspace);
+            return new 
AsyncPromise<CoordinatedRepairResult>().setSuccess(CoordinatedRepairResult.create(List.of(),
 List.of()));
+        }
+
+        List<MutationTrackingSyncCoordinator> syncCoordinators = new 
ArrayList<>();
+        List<Collection<Range<Token>>> rangeCollections = new ArrayList<>();
+
+        for (CommonRange commonRange : allRanges)
+        {
+            for (Range<Token> range : commonRange.ranges)
+            {
+                MutationTrackingSyncCoordinator syncCoordinator = new 
MutationTrackingSyncCoordinator(keyspace, range);
+                syncCoordinator.start();
+                syncCoordinators.add(syncCoordinator);
+                rangeCollections.add(List.of(range));
+
+                logger.info("Started mutation tracking sync for range {}", 
range);
+            }
+        }
+
+        coordinator.notifyProgress("Started mutation tracking sync for " + 
syncCoordinators.size() + " ranges");
+
+        AsyncPromise<CoordinatedRepairResult> resultPromise = new 
AsyncPromise<>();
+
+        executor.execute(() -> {
+            try
+            {
+                waitForSyncCompletion(syncCoordinators, executor, 
validationScheduler, allRanges, rangeCollections, resultPromise);
+            }
+            catch (Exception e)
+            {
+                logger.error("Error during mutation tracking repair", e);
+                resultPromise.tryFailure(e);
+            }
+        });
+
+        return resultPromise;
+    }
+
+    private void waitForSyncCompletion(List<MutationTrackingSyncCoordinator> 
syncCoordinators,
+                                       ExecutorPlus executor,
+                                       Scheduler validationScheduler,
+                                       List<CommonRange> allRanges,
+                                       List<Collection<Range<Token>>> 
rangeCollections,
+                                       AsyncPromise<CoordinatedRepairResult> 
resultPromise) throws InterruptedException
+    {
+        boolean allSucceeded = true;
+        for (MutationTrackingSyncCoordinator syncCoordinator : 
syncCoordinators)
+        {
+            boolean completed = 
syncCoordinator.awaitCompletion(SYNC_TIMEOUT_MINUTES, TimeUnit.MINUTES);
+            if (!completed)
+            {
+                logger.warn("Mutation tracking sync timed out for keyspace {} 
range {}",
+                            keyspace, syncCoordinator.getRange());
+                syncCoordinator.cancel();
+                allSucceeded = false;
+            }
+        }
+
+        if (!allSucceeded)
+        {
+            resultPromise.tryFailure(new RuntimeException("Mutation tracking 
sync timed out for some ranges"));
+            return;
+        }
+
+        coordinator.notifyProgress("Mutation tracking sync completed for all 
ranges");
+
+        if (requiresTraditionalRepair(keyspace))
+        {
+            runTraditionalRepairForMigration(executor, validationScheduler, 
allRanges, resultPromise);
+        }
+        else
+        {
+            // Pure mutation tracking - create successful result
+            
resultPromise.trySuccess(CoordinatedRepairResult.create(rangeCollections, 
List.of()));

Review Comment:
   I think the list of `RepairSessionResult` needs to be populated for other 
things like migration to Accord to work. In general it's pretty sus to leave 
that out given there are downstream consumers.



##########
src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.replication.MutationTrackingSyncCoordinator;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import 
org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.TimeUUID;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+
+/** Incremental repair task for keyspaces using mutation tracking */
+public class MutationTrackingIncrementalRepairTask extends AbstractRepairTask
+{
+    private static final long SYNC_TIMEOUT_MINUTES = 30;

Review Comment:
   This should be configurable, but also there is existing retry and timeout 
stuff for repair messaging. Maybe that can be re-used without introducing 
additional configuration.
   
   It might be we don't have timeouts and things retry forever in repair. I 
would need to look.



##########
src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+
+public class MutationTrackingSyncCoordinator
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(MutationTrackingSyncCoordinator.class);
+
+    private final String keyspace;
+    private final Range<Token> range;
+    private final AsyncPromise<Void> completionFuture = new AsyncPromise<>();
+
+    // Per-shard state: tracks what each node has reported for that shard
+    private final Map<Range<Token>, ShardSyncState> shardStates = new 
ConcurrentHashMap<>();

Review Comment:
   I don't think this needs to be a CHM because it's built once and needs to be 
built and populated before it can be accessed from multiple threads. More on 
this in a different comment.



##########
src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+
+public class MutationTrackingSyncCoordinator
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(MutationTrackingSyncCoordinator.class);
+
+    private final String keyspace;
+    private final Range<Token> range;
+    private final AsyncPromise<Void> completionFuture = new AsyncPromise<>();
+
+    // Per-shard state: tracks what each node has reported for that shard
+    private final Map<Range<Token>, ShardSyncState> shardStates = new 
ConcurrentHashMap<>();
+
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final AtomicBoolean completed = new AtomicBoolean(false);
+
+    public MutationTrackingSyncCoordinator(String keyspace, Range<Token> range)
+    {
+        this.keyspace = keyspace;
+        this.range = range;
+    }
+
+    public void start()
+    {
+        if (!started.compareAndSet(false, true))
+            throw new IllegalStateException("Sync coordinator already 
started");
+
+        List<Shard> overlappingShards;
+
+        overlappingShards = new ArrayList<>();
+        MutationTrackingService.instance.forEachShardInKeyspace(keyspace, 
shard -> {
+            if (shard.range.intersects(range))
+                overlappingShards.add(shard);
+        });
+
+        if (overlappingShards.isEmpty())
+        {
+            completionFuture.setSuccess(null);
+            return;
+        }
+
+        // Register to receive offset updates
+        MutationTrackingService.instance.registerSyncCoordinator(this);
+
+        // Initialize state for each shard and capture targets
+        for (Shard shard : overlappingShards)

Review Comment:
   Move this before registering the sync coordinator so it doesn't need to be a 
CHM since it will be immutable.
   
   It's also a correctness issue because while the shardStates is being 
populated it's possible that the listener fires in another thread, sees no 
shards, and decides the sync is complete.



##########
src/java/org/apache/cassandra/replication/CoordinatorLog.java:
##########
@@ -283,6 +283,24 @@ Offsets.Immutable collectReconciledOffsets()
         }
     }
 
+    /**
+     * Returns the UNION of all witnessed offsets from all participants.
+     * This represents all offsets that ANY replica has witnessed.
+     */
+    Offsets.Immutable collectUnionOfWitnessedOffsets()

Review Comment:
   How often will they broadcast if we just listen? I'm just wondering in the 
interests of repairs starting quickly so things like tests run quickly we 
should either proactively message or maybe change the broadcast interval for 
tests.



##########
test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.replication;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.apache.cassandra.replication.MutationTrackingSyncCoordinator;
+
+import static org.junit.Assert.*;
+
+/**
+ * Distributed tests for MutationTrackingSyncCoordinator.
+ *
+ * Tests that the sync coordinator correctly waits for offset convergence
+ * across all nodes in a cluster.
+ */
+public class MutationTrackingSyncCoordinatorTest extends TestBaseImpl
+{
+    private static final String KS_NAME = "sync_test_ks";
+    private static final String TBL_NAME = "sync_test_tbl";
+
+    private void createTrackedKeyspace(Cluster cluster, String keyspaceSuffix)
+    {
+        String ksName = KS_NAME + keyspaceSuffix;
+        cluster.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication 
= " +
+                             "{'class': 'SimpleStrategy', 
'replication_factor': 3} " +
+                             "AND replication_type='tracked'");
+        cluster.schemaChange("CREATE TABLE " + ksName + '.' + TBL_NAME + " (k 
int PRIMARY KEY, v int)");
+    }
+
+    private String tableName(String suffix)
+    {
+        return KS_NAME + suffix + '.' + TBL_NAME;
+    }
+
+    private void pauseOffsetBroadcasts(Cluster cluster, boolean pause)
+    {
+        for (int i = 1; i <= cluster.size(); i++)
+            cluster.get(i).runOnInstance(() -> 
MutationTrackingService.instance.pauseOffsetBroadcast(pause));
+    }
+
+    private static Range<Token> fullTokenRange()
+    {
+        return new Range<>(
+            new Murmur3Partitioner.LongToken(Long.MIN_VALUE),
+            new Murmur3Partitioner.LongToken(Long.MAX_VALUE)
+        );
+    }
+
+    @Test
+    public void testSyncCoordinatorCompletesWhenNoShards() throws Throwable
+    {
+        try (Cluster cluster = builder().withNodes(3).start())
+        {
+            createTrackedKeyspace(cluster, "");
+
+            // Create a sync coordinator for a range that has no data
+            // It should complete immediately since there are no offsets to 
sync
+            Boolean completed = cluster.get(1).callOnInstance(() -> {
+                MutationTrackingSyncCoordinator coordinator = new 
MutationTrackingSyncCoordinator(KS_NAME, fullTokenRange());
+                coordinator.start();
+
+                try
+                {
+                    return coordinator.awaitCompletion(5, TimeUnit.SECONDS);
+                }
+                catch (InterruptedException e)
+                {
+                    Thread.currentThread().interrupt();
+                    return false;
+                }
+            });
+
+            assertTrue("Sync coordinator should complete when there are no 
pending offsets", completed);
+        }
+    }
+
+    @Test
+    public void testSyncCoordinatorCompletesAfterDataSync() throws Throwable
+    {
+        try (Cluster cluster = builder().withNodes(6).start())
+        {
+            createTrackedKeyspace(cluster, "2");
+
+            // Insert some data to create mutations
+            for (int i = 0; i < 10000; i++)
+            {
+                cluster.coordinator(1).execute(
+                    "INSERT INTO " + tableName("2") + " (k, v) VALUES (?, ?)",
+                    ConsistencyLevel.ALL, i, i
+                );
+            }
+
+            Thread.sleep(500); // Wait for offset broadcasts to propagate
+
+            // Create a sync coordinator - should complete since all data is 
synced (CL.ALL)
+            Boolean completed = cluster.get(1).callOnInstance(() -> {
+                MutationTrackingSyncCoordinator coordinator = new 
MutationTrackingSyncCoordinator(KS_NAME + '2', fullTokenRange());
+                coordinator.start();
+
+                try
+                {
+                    // Give it enough time for broadcasts to arrive
+                    return coordinator.awaitCompletion(15, TimeUnit.SECONDS);
+                }
+                catch (InterruptedException e)
+                {
+                    Thread.currentThread().interrupt();
+                    return false;
+                }
+            });
+
+            assertTrue("Sync coordinator should complete after data is fully 
replicated", completed);
+        }
+    }
+
+    @Test
+    public void testSyncCoordinatorWaitsForAllReplicasMutations() throws 
Throwable

Review Comment:
   I think this test doesn't successfully get at what we are checking because 
it follows the happy path a little too closely and then doesn't validate that 
`MutationTrackingSyncCoordinator` actually had to wait.
   
   When you write at CL.ONE it still sends the write to all replicas so you 
don't actually end up having to wait for mutation tracking to do anything.
   
   I think the thing to do in this scenario is write at CL.ONE on one 
coordinator and block all messages (there are message filters you can set 
pretty easily) from that coordinator so you know the write hasn't propagated.
   
   Then create the `MutationTrackingSyncCoordinator` and have another thread 
wait on it doing its thing. This should not complete at this point because 
messages are still blocked. One thing this will highlight is the lack of 
retries in repair collecting the offsets (if you don't add retries).
   
   After waiting some amount of time to see that it is indeed waiting on the 
end goal you can remove the message filters and then you should see the thread 
waiting on `MutationTrackingSyncCoordinator`complete.
   
   At that point you need to go in and read the data from each node and check 
that it actually propagated and matches.
   
   When reading from individual nodes there is a path that skips all the 
distributed machinery and guarantees you are reading precisely from that node 
and nothing else. If you read at CL.ONE it's possible that the replica a 
coordinator selects isn't actually the node coordinating the query.



##########
test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.replication;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.apache.cassandra.replication.MutationTrackingSyncCoordinator;
+
+import static org.junit.Assert.*;
+
+/**
+ * Distributed tests for MutationTrackingSyncCoordinator.
+ *
+ * Tests that the sync coordinator correctly waits for offset convergence
+ * across all nodes in a cluster.
+ */
+public class MutationTrackingSyncCoordinatorTest extends TestBaseImpl

Review Comment:
   This test is instantiating the coordinator directly and isn't end to end 
testing repair. There should be an end to end test somewhere.
   
   We also need end to end tests of migration to/from mutation tracking.
   
   I think we also support cancellation of repairs so we need a test that 
cancellation works, but you should double check how that works.



##########
test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.replication;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.apache.cassandra.replication.MutationTrackingSyncCoordinator;
+
+import static org.junit.Assert.*;
+
+/**
+ * Distributed tests for MutationTrackingSyncCoordinator.
+ *
+ * Tests that the sync coordinator correctly waits for offset convergence
+ * across all nodes in a cluster.
+ */
+public class MutationTrackingSyncCoordinatorTest extends TestBaseImpl
+{
+    private static final String KS_NAME = "sync_test_ks";
+    private static final String TBL_NAME = "sync_test_tbl";
+
+    private void createTrackedKeyspace(Cluster cluster, String keyspaceSuffix)
+    {
+        String ksName = KS_NAME + keyspaceSuffix;
+        cluster.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication 
= " +
+                             "{'class': 'SimpleStrategy', 
'replication_factor': 3} " +
+                             "AND replication_type='tracked'");
+        cluster.schemaChange("CREATE TABLE " + ksName + '.' + TBL_NAME + " (k 
int PRIMARY KEY, v int)");
+    }
+
+    private String tableName(String suffix)
+    {
+        return KS_NAME + suffix + '.' + TBL_NAME;
+    }
+
+    private void pauseOffsetBroadcasts(Cluster cluster, boolean pause)
+    {
+        for (int i = 1; i <= cluster.size(); i++)
+            cluster.get(i).runOnInstance(() -> 
MutationTrackingService.instance.pauseOffsetBroadcast(pause));
+    }
+
+    private static Range<Token> fullTokenRange()
+    {
+        return new Range<>(
+            new Murmur3Partitioner.LongToken(Long.MIN_VALUE),
+            new Murmur3Partitioner.LongToken(Long.MAX_VALUE)
+        );
+    }
+
+    @Test
+    public void testSyncCoordinatorCompletesWhenNoShards() throws Throwable
+    {
+        try (Cluster cluster = builder().withNodes(3).start())
+        {
+            createTrackedKeyspace(cluster, "");
+
+            // Create a sync coordinator for a range that has no data
+            // It should complete immediately since there are no offsets to 
sync
+            Boolean completed = cluster.get(1).callOnInstance(() -> {
+                MutationTrackingSyncCoordinator coordinator = new 
MutationTrackingSyncCoordinator(KS_NAME, fullTokenRange());
+                coordinator.start();
+
+                try
+                {
+                    return coordinator.awaitCompletion(5, TimeUnit.SECONDS);
+                }
+                catch (InterruptedException e)
+                {
+                    Thread.currentThread().interrupt();
+                    return false;
+                }
+            });
+
+            assertTrue("Sync coordinator should complete when there are no 
pending offsets", completed);
+        }
+    }
+
+    @Test
+    public void testSyncCoordinatorCompletesAfterDataSync() throws Throwable

Review Comment:
   I think this test ends up being redundant with 
`testSyncCoordinatorWaitsForAllReplicasMutations`?



##########
src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+
+public class MutationTrackingSyncCoordinator
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(MutationTrackingSyncCoordinator.class);
+
+    private final String keyspace;
+    private final Range<Token> range;
+    private final AsyncPromise<Void> completionFuture = new AsyncPromise<>();
+
+    // Per-shard state: tracks what each node has reported for that shard
+    private final Map<Range<Token>, ShardSyncState> shardStates = new 
ConcurrentHashMap<>();
+
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final AtomicBoolean completed = new AtomicBoolean(false);
+
+    public MutationTrackingSyncCoordinator(String keyspace, Range<Token> range)
+    {
+        this.keyspace = keyspace;
+        this.range = range;
+    }
+
+    public void start()
+    {
+        if (!started.compareAndSet(false, true))
+            throw new IllegalStateException("Sync coordinator already 
started");
+
+        List<Shard> overlappingShards;
+
+        overlappingShards = new ArrayList<>();
+        MutationTrackingService.instance.forEachShardInKeyspace(keyspace, 
shard -> {
+            if (shard.range.intersects(range))
+                overlappingShards.add(shard);
+        });
+
+        if (overlappingShards.isEmpty())
+        {
+            completionFuture.setSuccess(null);
+            return;
+        }
+
+        // Register to receive offset updates
+        MutationTrackingService.instance.registerSyncCoordinator(this);
+
+        // Initialize state for each shard and capture targets
+        for (Shard shard : overlappingShards)
+        {
+            ShardSyncState state = new ShardSyncState(shard);
+            state.captureTargets();
+            shardStates.put(shard.range, state);
+        }
+
+        if (checkIfComplete())
+        {
+            complete();
+            return;
+        }
+
+        logger.info("Sync coordinator started for keyspace {} range {}, 
tracking {} shards",
+                   keyspace, range, overlappingShards.size());
+    }
+
+    private void complete()
+    {
+        if (!completed.compareAndSet(false, true))
+            return;
+        MutationTrackingService.instance.unregisterSyncCoordinator(this);
+        completionFuture.setSuccess(null);
+    }
+
+    private boolean checkIfComplete()
+    {
+        for (ShardSyncState state : shardStates.values())
+        {
+            if (!state.isComplete())
+                return false;
+        }
+        return true;
+    }
+
+    public void onOffsetsReceived()
+    {
+        if (completed.get())
+            return;
+
+        // The underlying CoordinatorLog already updates its reconciled 
offsets.
+        // We just need to re-check if we're now complete.
+        if (checkIfComplete())
+        {
+            complete();
+        }
+    }
+
+    public String getKeyspace()
+    {
+        return keyspace;
+    }
+
+    public Range<Token> getRange()
+    {
+        return range;
+    }
+
+    /**
+     * Blocks until sync completes or timeout is reached.
+     *
+     * @param timeout Maximum time to wait
+     * @param unit Time unit
+     * @return true if completed, false if timed out
+     */
+    public boolean awaitCompletion(long timeout, TimeUnit unit) throws 
InterruptedException
+    {
+        try
+        {
+            completionFuture.get(timeout, unit);
+            return true;
+        }
+        catch (java.util.concurrent.TimeoutException e)
+        {
+            return false;
+        }
+        catch (java.util.concurrent.ExecutionException e)
+        {
+            throw new RuntimeException(e.getCause());
+        }
+    }
+
+    public void cancel()
+    {
+        if (completed.compareAndSet(false, true))
+        {
+            MutationTrackingService.instance.unregisterSyncCoordinator(this);
+            completionFuture.setFailure(new RuntimeException("Sync 
cancelled"));
+        }
+    }
+
+    /**
+     * Tracks sync state for a single shard.
+     */
+    private static class ShardSyncState
+    {
+        private final Shard shard;

Review Comment:
   This reference to the shard can become stale because Shard.withParticipants 
can get called. This can also result in new logs being added or removed and 
also having withParticipants on the logs being called.
   
   This ties into how repair handles topology change in general. I think the 
right behavior for now is to detect it and fail the repair and we can come back 
and do something more involved later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to