aweisberg commented on code in PR #4569: URL: https://github.com/apache/cassandra/pull/4569#discussion_r2718105627
########## src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.replication.MutationTrackingSyncCoordinator; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.utils.TimeUUID; +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; + +/** Incremental repair task for keyspaces using mutation tracking */ +public class MutationTrackingIncrementalRepairTask extends AbstractRepairTask +{ + private static final long SYNC_TIMEOUT_MINUTES = 30; + + private final TimeUUID parentSession; + private final RepairCoordinator.NeighborsAndRanges neighborsAndRanges; + private final String[] cfnames; + + protected MutationTrackingIncrementalRepairTask(RepairCoordinator coordinator, + TimeUUID parentSession, + RepairCoordinator.NeighborsAndRanges neighborsAndRanges, + String[] cfnames) + { + super(coordinator); + this.parentSession = parentSession; + this.neighborsAndRanges = neighborsAndRanges; + this.cfnames = cfnames; + } + + @Override + public String name() + { + return "MutationTrackingIncrementalRepair"; + } + + @Override + public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor, Scheduler validationScheduler) + { + List<CommonRange> allRanges = neighborsAndRanges.filterCommonRanges(keyspace, cfnames); + + if (allRanges.isEmpty()) + { + logger.info("No common ranges to repair for keyspace {}", keyspace); + return new AsyncPromise<CoordinatedRepairResult>().setSuccess(CoordinatedRepairResult.create(List.of(), List.of())); + } + + List<MutationTrackingSyncCoordinator> syncCoordinators = new ArrayList<>(); + List<Collection<Range<Token>>> rangeCollections = new ArrayList<>(); + + for (CommonRange commonRange : allRanges) + { + for (Range<Token> range : commonRange.ranges) + { + MutationTrackingSyncCoordinator syncCoordinator = new MutationTrackingSyncCoordinator(keyspace, range); + syncCoordinator.start(); + syncCoordinators.add(syncCoordinator); + rangeCollections.add(List.of(range)); + + logger.info("Started mutation tracking sync for range {}", range); + } + } + + coordinator.notifyProgress("Started mutation tracking sync for " + syncCoordinators.size() + " ranges"); + + AsyncPromise<CoordinatedRepairResult> resultPromise = new AsyncPromise<>(); + + executor.execute(() -> { + try + { + waitForSyncCompletion(syncCoordinators, executor, validationScheduler, allRanges, rangeCollections, resultPromise); + } + catch (Exception e) + { + logger.error("Error during mutation tracking repair", e); + resultPromise.tryFailure(e); + } + }); + + return resultPromise; + } + + private void waitForSyncCompletion(List<MutationTrackingSyncCoordinator> syncCoordinators, + ExecutorPlus executor, + Scheduler validationScheduler, + List<CommonRange> allRanges, + List<Collection<Range<Token>>> rangeCollections, + AsyncPromise<CoordinatedRepairResult> resultPromise) throws InterruptedException + { + boolean allSucceeded = true; + for (MutationTrackingSyncCoordinator syncCoordinator : syncCoordinators) + { + boolean completed = syncCoordinator.awaitCompletion(SYNC_TIMEOUT_MINUTES, TimeUnit.MINUTES); + if (!completed) + { + logger.warn("Mutation tracking sync timed out for keyspace {} range {}", + keyspace, syncCoordinator.getRange()); + syncCoordinator.cancel(); + allSucceeded = false; + } + } + + if (!allSucceeded) + { + resultPromise.tryFailure(new RuntimeException("Mutation tracking sync timed out for some ranges")); + return; + } + + coordinator.notifyProgress("Mutation tracking sync completed for all ranges"); + + if (requiresTraditionalRepair(keyspace)) + { + runTraditionalRepairForMigration(executor, validationScheduler, allRanges, resultPromise); + } + else + { + // Pure mutation tracking - create successful result + resultPromise.trySuccess(CoordinatedRepairResult.create(rangeCollections, List.of())); Review Comment: I think the list of `RepairSessionResult` needs to be populated for other things like migration to Accord to work. In general it's pretty sus to leave that out given there are downstream consumers. ########## src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.replication.MutationTrackingSyncCoordinator; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.utils.TimeUUID; +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; + +/** Incremental repair task for keyspaces using mutation tracking */ +public class MutationTrackingIncrementalRepairTask extends AbstractRepairTask +{ + private static final long SYNC_TIMEOUT_MINUTES = 30; Review Comment: This should be configurable, but also there is existing retry and timeout stuff for repair messaging. Maybe that can be re-used without introducing additional configuration. It might be we don't have timeouts and things retry forever in repair. I would need to look. ########## src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java: ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.utils.concurrent.AsyncPromise; + +public class MutationTrackingSyncCoordinator +{ + private static final Logger logger = LoggerFactory.getLogger(MutationTrackingSyncCoordinator.class); + + private final String keyspace; + private final Range<Token> range; + private final AsyncPromise<Void> completionFuture = new AsyncPromise<>(); + + // Per-shard state: tracks what each node has reported for that shard + private final Map<Range<Token>, ShardSyncState> shardStates = new ConcurrentHashMap<>(); Review Comment: I don't think this needs to be a CHM because it's built once and needs to be built and populated before it can be accessed from multiple threads. More on this in a different comment. ########## src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java: ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.utils.concurrent.AsyncPromise; + +public class MutationTrackingSyncCoordinator +{ + private static final Logger logger = LoggerFactory.getLogger(MutationTrackingSyncCoordinator.class); + + private final String keyspace; + private final Range<Token> range; + private final AsyncPromise<Void> completionFuture = new AsyncPromise<>(); + + // Per-shard state: tracks what each node has reported for that shard + private final Map<Range<Token>, ShardSyncState> shardStates = new ConcurrentHashMap<>(); + + private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean completed = new AtomicBoolean(false); + + public MutationTrackingSyncCoordinator(String keyspace, Range<Token> range) + { + this.keyspace = keyspace; + this.range = range; + } + + public void start() + { + if (!started.compareAndSet(false, true)) + throw new IllegalStateException("Sync coordinator already started"); + + List<Shard> overlappingShards; + + overlappingShards = new ArrayList<>(); + MutationTrackingService.instance.forEachShardInKeyspace(keyspace, shard -> { + if (shard.range.intersects(range)) + overlappingShards.add(shard); + }); + + if (overlappingShards.isEmpty()) + { + completionFuture.setSuccess(null); + return; + } + + // Register to receive offset updates + MutationTrackingService.instance.registerSyncCoordinator(this); + + // Initialize state for each shard and capture targets + for (Shard shard : overlappingShards) Review Comment: Move this before registering the sync coordinator so it doesn't need to be a CHM since it will be immutable. It's also a correctness issue because while the shardStates is being populated it's possible that the listener fires in another thread, sees no shards, and decides the sync is complete. ########## src/java/org/apache/cassandra/replication/CoordinatorLog.java: ########## @@ -283,6 +283,24 @@ Offsets.Immutable collectReconciledOffsets() } } + /** + * Returns the UNION of all witnessed offsets from all participants. + * This represents all offsets that ANY replica has witnessed. + */ + Offsets.Immutable collectUnionOfWitnessedOffsets() Review Comment: How often will they broadcast if we just listen? I'm just wondering in the interests of repairs starting quickly so things like tests run quickly we should either proactively message or maybe change the broadcast interval for tests. ########## test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test.replication; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.replication.MutationTrackingService; +import org.apache.cassandra.replication.MutationTrackingSyncCoordinator; + +import static org.junit.Assert.*; + +/** + * Distributed tests for MutationTrackingSyncCoordinator. + * + * Tests that the sync coordinator correctly waits for offset convergence + * across all nodes in a cluster. + */ +public class MutationTrackingSyncCoordinatorTest extends TestBaseImpl +{ + private static final String KS_NAME = "sync_test_ks"; + private static final String TBL_NAME = "sync_test_tbl"; + + private void createTrackedKeyspace(Cluster cluster, String keyspaceSuffix) + { + String ksName = KS_NAME + keyspaceSuffix; + cluster.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + cluster.schemaChange("CREATE TABLE " + ksName + '.' + TBL_NAME + " (k int PRIMARY KEY, v int)"); + } + + private String tableName(String suffix) + { + return KS_NAME + suffix + '.' + TBL_NAME; + } + + private void pauseOffsetBroadcasts(Cluster cluster, boolean pause) + { + for (int i = 1; i <= cluster.size(); i++) + cluster.get(i).runOnInstance(() -> MutationTrackingService.instance.pauseOffsetBroadcast(pause)); + } + + private static Range<Token> fullTokenRange() + { + return new Range<>( + new Murmur3Partitioner.LongToken(Long.MIN_VALUE), + new Murmur3Partitioner.LongToken(Long.MAX_VALUE) + ); + } + + @Test + public void testSyncCoordinatorCompletesWhenNoShards() throws Throwable + { + try (Cluster cluster = builder().withNodes(3).start()) + { + createTrackedKeyspace(cluster, ""); + + // Create a sync coordinator for a range that has no data + // It should complete immediately since there are no offsets to sync + Boolean completed = cluster.get(1).callOnInstance(() -> { + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME, fullTokenRange()); + coordinator.start(); + + try + { + return coordinator.awaitCompletion(5, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + }); + + assertTrue("Sync coordinator should complete when there are no pending offsets", completed); + } + } + + @Test + public void testSyncCoordinatorCompletesAfterDataSync() throws Throwable + { + try (Cluster cluster = builder().withNodes(6).start()) + { + createTrackedKeyspace(cluster, "2"); + + // Insert some data to create mutations + for (int i = 0; i < 10000; i++) + { + cluster.coordinator(1).execute( + "INSERT INTO " + tableName("2") + " (k, v) VALUES (?, ?)", + ConsistencyLevel.ALL, i, i + ); + } + + Thread.sleep(500); // Wait for offset broadcasts to propagate + + // Create a sync coordinator - should complete since all data is synced (CL.ALL) + Boolean completed = cluster.get(1).callOnInstance(() -> { + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME + '2', fullTokenRange()); + coordinator.start(); + + try + { + // Give it enough time for broadcasts to arrive + return coordinator.awaitCompletion(15, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + }); + + assertTrue("Sync coordinator should complete after data is fully replicated", completed); + } + } + + @Test + public void testSyncCoordinatorWaitsForAllReplicasMutations() throws Throwable Review Comment: I think this test doesn't successfully get at what we are checking because it follows the happy path a little too closely and then doesn't validate that `MutationTrackingSyncCoordinator` actually had to wait. When you write at CL.ONE it still sends the write to all replicas so you don't actually end up having to wait for mutation tracking to do anything. I think the thing to do in this scenario is write at CL.ONE on one coordinator and block all messages (there are message filters you can set pretty easily) from that coordinator so you know the write hasn't propagated. Then create the `MutationTrackingSyncCoordinator` and have another thread wait on it doing its thing. This should not complete at this point because messages are still blocked. One thing this will highlight is the lack of retries in repair collecting the offsets (if you don't add retries). After waiting some amount of time to see that it is indeed waiting on the end goal you can remove the message filters and then you should see the thread waiting on `MutationTrackingSyncCoordinator`complete. At that point you need to go in and read the data from each node and check that it actually propagated and matches. When reading from individual nodes there is a path that skips all the distributed machinery and guarantees you are reading precisely from that node and nothing else. If you read at CL.ONE it's possible that the replica a coordinator selects isn't actually the node coordinating the query. ########## test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test.replication; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.replication.MutationTrackingService; +import org.apache.cassandra.replication.MutationTrackingSyncCoordinator; + +import static org.junit.Assert.*; + +/** + * Distributed tests for MutationTrackingSyncCoordinator. + * + * Tests that the sync coordinator correctly waits for offset convergence + * across all nodes in a cluster. + */ +public class MutationTrackingSyncCoordinatorTest extends TestBaseImpl Review Comment: This test is instantiating the coordinator directly and isn't end to end testing repair. There should be an end to end test somewhere. We also need end to end tests of migration to/from mutation tracking. I think we also support cancellation of repairs so we need a test that cancellation works, but you should double check how that works. ########## test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test.replication; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.replication.MutationTrackingService; +import org.apache.cassandra.replication.MutationTrackingSyncCoordinator; + +import static org.junit.Assert.*; + +/** + * Distributed tests for MutationTrackingSyncCoordinator. + * + * Tests that the sync coordinator correctly waits for offset convergence + * across all nodes in a cluster. + */ +public class MutationTrackingSyncCoordinatorTest extends TestBaseImpl +{ + private static final String KS_NAME = "sync_test_ks"; + private static final String TBL_NAME = "sync_test_tbl"; + + private void createTrackedKeyspace(Cluster cluster, String keyspaceSuffix) + { + String ksName = KS_NAME + keyspaceSuffix; + cluster.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + cluster.schemaChange("CREATE TABLE " + ksName + '.' + TBL_NAME + " (k int PRIMARY KEY, v int)"); + } + + private String tableName(String suffix) + { + return KS_NAME + suffix + '.' + TBL_NAME; + } + + private void pauseOffsetBroadcasts(Cluster cluster, boolean pause) + { + for (int i = 1; i <= cluster.size(); i++) + cluster.get(i).runOnInstance(() -> MutationTrackingService.instance.pauseOffsetBroadcast(pause)); + } + + private static Range<Token> fullTokenRange() + { + return new Range<>( + new Murmur3Partitioner.LongToken(Long.MIN_VALUE), + new Murmur3Partitioner.LongToken(Long.MAX_VALUE) + ); + } + + @Test + public void testSyncCoordinatorCompletesWhenNoShards() throws Throwable + { + try (Cluster cluster = builder().withNodes(3).start()) + { + createTrackedKeyspace(cluster, ""); + + // Create a sync coordinator for a range that has no data + // It should complete immediately since there are no offsets to sync + Boolean completed = cluster.get(1).callOnInstance(() -> { + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME, fullTokenRange()); + coordinator.start(); + + try + { + return coordinator.awaitCompletion(5, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + }); + + assertTrue("Sync coordinator should complete when there are no pending offsets", completed); + } + } + + @Test + public void testSyncCoordinatorCompletesAfterDataSync() throws Throwable Review Comment: I think this test ends up being redundant with `testSyncCoordinatorWaitsForAllReplicasMutations`? ########## src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java: ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.utils.concurrent.AsyncPromise; + +public class MutationTrackingSyncCoordinator +{ + private static final Logger logger = LoggerFactory.getLogger(MutationTrackingSyncCoordinator.class); + + private final String keyspace; + private final Range<Token> range; + private final AsyncPromise<Void> completionFuture = new AsyncPromise<>(); + + // Per-shard state: tracks what each node has reported for that shard + private final Map<Range<Token>, ShardSyncState> shardStates = new ConcurrentHashMap<>(); + + private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean completed = new AtomicBoolean(false); + + public MutationTrackingSyncCoordinator(String keyspace, Range<Token> range) + { + this.keyspace = keyspace; + this.range = range; + } + + public void start() + { + if (!started.compareAndSet(false, true)) + throw new IllegalStateException("Sync coordinator already started"); + + List<Shard> overlappingShards; + + overlappingShards = new ArrayList<>(); + MutationTrackingService.instance.forEachShardInKeyspace(keyspace, shard -> { + if (shard.range.intersects(range)) + overlappingShards.add(shard); + }); + + if (overlappingShards.isEmpty()) + { + completionFuture.setSuccess(null); + return; + } + + // Register to receive offset updates + MutationTrackingService.instance.registerSyncCoordinator(this); + + // Initialize state for each shard and capture targets + for (Shard shard : overlappingShards) + { + ShardSyncState state = new ShardSyncState(shard); + state.captureTargets(); + shardStates.put(shard.range, state); + } + + if (checkIfComplete()) + { + complete(); + return; + } + + logger.info("Sync coordinator started for keyspace {} range {}, tracking {} shards", + keyspace, range, overlappingShards.size()); + } + + private void complete() + { + if (!completed.compareAndSet(false, true)) + return; + MutationTrackingService.instance.unregisterSyncCoordinator(this); + completionFuture.setSuccess(null); + } + + private boolean checkIfComplete() + { + for (ShardSyncState state : shardStates.values()) + { + if (!state.isComplete()) + return false; + } + return true; + } + + public void onOffsetsReceived() + { + if (completed.get()) + return; + + // The underlying CoordinatorLog already updates its reconciled offsets. + // We just need to re-check if we're now complete. + if (checkIfComplete()) + { + complete(); + } + } + + public String getKeyspace() + { + return keyspace; + } + + public Range<Token> getRange() + { + return range; + } + + /** + * Blocks until sync completes or timeout is reached. + * + * @param timeout Maximum time to wait + * @param unit Time unit + * @return true if completed, false if timed out + */ + public boolean awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException + { + try + { + completionFuture.get(timeout, unit); + return true; + } + catch (java.util.concurrent.TimeoutException e) + { + return false; + } + catch (java.util.concurrent.ExecutionException e) + { + throw new RuntimeException(e.getCause()); + } + } + + public void cancel() + { + if (completed.compareAndSet(false, true)) + { + MutationTrackingService.instance.unregisterSyncCoordinator(this); + completionFuture.setFailure(new RuntimeException("Sync cancelled")); + } + } + + /** + * Tracks sync state for a single shard. + */ + private static class ShardSyncState + { + private final Shard shard; Review Comment: This reference to the shard can become stale because Shard.withParticipants can get called. This can also result in new logs being added or removed and also having withParticipants on the logs being called. This ties into how repair handles topology change in general. I think the right behavior for now is to detect it and fail the repair and we can come back and do something more involved later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

