ifesdjeen commented on code in PR #4461:
URL: https://github.com/apache/cassandra/pull/4461#discussion_r2491909990


##########
.gitmodules:
##########
@@ -1,4 +1,4 @@
 [submodule "modules/accord"]
        path = modules/accord
-       url = https://github.com/apache/cassandra-accord.git
-       branch = trunk
+       url = https://github.com/belliottsmith/cassandra-accord.git

Review Comment:
   Just to make sure you have it on your radar: private branch



##########
src/java/org/apache/cassandra/service/accord/AccordTopologyService.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import accord.api.TopologyListener;
+import accord.api.TopologyService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.topology.TopologyRetiredException;
+import accord.utils.Invariants;
+import accord.utils.SortedArrays.SortedArrayList;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults.SettableByCallback;
+import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.concurrent.Shutdownable;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessageDelivery;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.SharedContext;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Simulate;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.cassandra.utils.Simulate.With.MONITORS;
+
+@Simulate(with=MONITORS)
+public class AccordTopologyService implements TopologyService, Shutdownable, 
TopologyListener
+{
+    public static final Logger logger = 
LoggerFactory.getLogger(AccordTopologyService.class);
+
+    // TODO (expected): move syncPropagator and watermarkCollector out of this 
class (and merge them)
+    private final AccordSyncPropagator syncPropagator;
+    private final WatermarkCollector watermarkCollector;
+
+    private SortedArrayList<Node.Id> previouslyRemovedIds = 
SortedArrayList.ofSorted();
+
+    private enum State { INITIALIZED, STARTED, SHUTDOWN }
+
+    @GuardedBy("this")
+    private State state = State.INITIALIZED;
+
+    public AccordTopologyService(Node.Id node, AccordEndpointMapper 
endpointMapper, MessageDelivery messagingService, ScheduledExecutorPlus 
scheduledTasks)
+    {
+        this.syncPropagator = new AccordSyncPropagator(node, endpointMapper, 
messagingService, scheduledTasks);
+        this.watermarkCollector = new WatermarkCollector();
+    }
+
+    public AccordTopologyService(Node.Id node, AccordEndpointMapper 
endpointMapper)
+    {
+        this(node, endpointMapper, MessagingService.instance(), 
ScheduledExecutors.scheduledTasks);
+    }
+
+    /**
+     * On restart, loads topologies. On bootstrap, discovers existing 
topologies and initializes the node.
+     */
+    public void onStartup(Node node)
+    {
+        SortedArrayList<Node.Id> removed = 
node.topology().current().removedIds();
+        synchronized (this)
+        {
+            Invariants.require(state == State.INITIALIZED, "Expected state to 
be INITIALIZED but was %s", state);
+            state = State.STARTED;
+            previouslyRemovedIds = removed;
+            node.topology().addListener(watermarkCollector);
+            node.topology().addListener(syncPropagator);
+        }
+        syncPropagator.onNodesRemoved(removed);
+    }
+
+    @Override
+    public synchronized boolean isTerminated()
+    {
+        return state == State.SHUTDOWN;
+    }
+
+    @Override
+    public synchronized void shutdown()
+    {
+        if (isTerminated())
+            return;
+        state = State.SHUTDOWN;
+    }
+
+    @Override
+    public Object shutdownNow()
+    {
+        shutdown();
+        return null;
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit units) throws 
InterruptedException
+    {
+        return isTerminated();
+    }
+
+    @Override
+    public void onReceived(Topology topology)
+    {
+        SortedArrayList<Node.Id> newlyRemoved;
+        synchronized (this)
+        {
+            newlyRemoved = topology.removedIds().without(previouslyRemovedIds);
+            previouslyRemovedIds = 
topology.removedIds().with(previouslyRemovedIds);
+        }
+        syncPropagator.onNodesRemoved(newlyRemoved);
+    }
+
+    @Override
+    public AsyncResult<Topology> fetchTopologyForEpoch(long epoch)
+    {
+        SettableByCallback<Topology> result = new SettableByCallback<>();
+        fetchTopologyAsync(epoch, result);
+        return result;
+    }
+
+    private void fetchTopologyAsync(long epoch, BiConsumer<? super Topology, ? 
super Throwable> onResult)
+    {
+        // It's not safe for this to block on CMS so for now pick a thread 
pool to handle it
+        Stage.ACCORD_MIGRATION.execute(() -> {
+            try
+            {
+                if (ClusterMetadata.current().epoch.getEpoch() < epoch)

Review Comment:
   nit: can move `metadata` variable declaration from L167 and put it before 
`try` to save a call, also useful for clarity that we always have just one 
epoch here.



##########
src/java/org/apache/cassandra/service/accord/AccordTopologyService.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import accord.api.TopologyListener;
+import accord.api.TopologyService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.topology.TopologyRetiredException;
+import accord.utils.Invariants;
+import accord.utils.SortedArrays.SortedArrayList;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults.SettableByCallback;
+import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.concurrent.Shutdownable;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessageDelivery;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.SharedContext;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Simulate;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.cassandra.utils.Simulate.With.MONITORS;
+
+@Simulate(with=MONITORS)
+public class AccordTopologyService implements TopologyService, Shutdownable, 
TopologyListener
+{
+    public static final Logger logger = 
LoggerFactory.getLogger(AccordTopologyService.class);
+
+    // TODO (expected): move syncPropagator and watermarkCollector out of this 
class (and merge them)
+    private final AccordSyncPropagator syncPropagator;
+    private final WatermarkCollector watermarkCollector;
+
+    private SortedArrayList<Node.Id> previouslyRemovedIds = 
SortedArrayList.ofSorted();
+
+    private enum State { INITIALIZED, STARTED, SHUTDOWN }
+
+    @GuardedBy("this")
+    private State state = State.INITIALIZED;
+
+    public AccordTopologyService(Node.Id node, AccordEndpointMapper 
endpointMapper, MessageDelivery messagingService, ScheduledExecutorPlus 
scheduledTasks)
+    {
+        this.syncPropagator = new AccordSyncPropagator(node, endpointMapper, 
messagingService, scheduledTasks);
+        this.watermarkCollector = new WatermarkCollector();
+    }
+
+    public AccordTopologyService(Node.Id node, AccordEndpointMapper 
endpointMapper)
+    {
+        this(node, endpointMapper, MessagingService.instance(), 
ScheduledExecutors.scheduledTasks);
+    }
+
+    /**
+     * On restart, loads topologies. On bootstrap, discovers existing 
topologies and initializes the node.
+     */
+    public void onStartup(Node node)
+    {
+        SortedArrayList<Node.Id> removed = 
node.topology().current().removedIds();
+        synchronized (this)
+        {
+            Invariants.require(state == State.INITIALIZED, "Expected state to 
be INITIALIZED but was %s", state);
+            state = State.STARTED;
+            previouslyRemovedIds = removed;
+            node.topology().addListener(watermarkCollector);
+            node.topology().addListener(syncPropagator);
+        }
+        syncPropagator.onNodesRemoved(removed);
+    }
+
+    @Override
+    public synchronized boolean isTerminated()
+    {
+        return state == State.SHUTDOWN;
+    }
+
+    @Override
+    public synchronized void shutdown()
+    {
+        if (isTerminated())
+            return;
+        state = State.SHUTDOWN;
+    }
+
+    @Override
+    public Object shutdownNow()
+    {
+        shutdown();
+        return null;
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit units) throws 
InterruptedException
+    {
+        return isTerminated();
+    }
+
+    @Override
+    public void onReceived(Topology topology)
+    {
+        SortedArrayList<Node.Id> newlyRemoved;
+        synchronized (this)
+        {
+            newlyRemoved = topology.removedIds().without(previouslyRemovedIds);
+            previouslyRemovedIds = 
topology.removedIds().with(previouslyRemovedIds);
+        }
+        syncPropagator.onNodesRemoved(newlyRemoved);
+    }
+
+    @Override
+    public AsyncResult<Topology> fetchTopologyForEpoch(long epoch)
+    {
+        SettableByCallback<Topology> result = new SettableByCallback<>();
+        fetchTopologyAsync(epoch, result);
+        return result;
+    }
+
+    private void fetchTopologyAsync(long epoch, BiConsumer<? super Topology, ? 
super Throwable> onResult)

Review Comment:
   previously, we would issue just one call per epoch, and I think this 
behavior is useful, and we should probably preserve it, WDYT?



##########
test/unit/org/apache/cassandra/service/accord/AccordTopologyServiceTest.java:
##########
@@ -0,0 +1,297 @@
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file

Review Comment:
   Should we remove this one?



##########
src/java/org/apache/cassandra/service/accord/AccordTopologyService.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import accord.api.TopologyListener;
+import accord.api.TopologyService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.topology.TopologyRetiredException;
+import accord.utils.Invariants;
+import accord.utils.SortedArrays.SortedArrayList;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults.SettableByCallback;
+import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.concurrent.Shutdownable;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessageDelivery;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.SharedContext;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Simulate;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.cassandra.utils.Simulate.With.MONITORS;
+
+@Simulate(with=MONITORS)
+public class AccordTopologyService implements TopologyService, Shutdownable, 
TopologyListener
+{
+    public static final Logger logger = 
LoggerFactory.getLogger(AccordTopologyService.class);
+
+    // TODO (expected): move syncPropagator and watermarkCollector out of this 
class (and merge them)
+    private final AccordSyncPropagator syncPropagator;
+    private final WatermarkCollector watermarkCollector;
+
+    private SortedArrayList<Node.Id> previouslyRemovedIds = 
SortedArrayList.ofSorted();
+
+    private enum State { INITIALIZED, STARTED, SHUTDOWN }
+
+    @GuardedBy("this")
+    private State state = State.INITIALIZED;
+
+    public AccordTopologyService(Node.Id node, AccordEndpointMapper 
endpointMapper, MessageDelivery messagingService, ScheduledExecutorPlus 
scheduledTasks)
+    {
+        this.syncPropagator = new AccordSyncPropagator(node, endpointMapper, 
messagingService, scheduledTasks);
+        this.watermarkCollector = new WatermarkCollector();
+    }
+
+    public AccordTopologyService(Node.Id node, AccordEndpointMapper 
endpointMapper)
+    {
+        this(node, endpointMapper, MessagingService.instance(), 
ScheduledExecutors.scheduledTasks);
+    }
+
+    /**
+     * On restart, loads topologies. On bootstrap, discovers existing 
topologies and initializes the node.
+     */
+    public void onStartup(Node node)
+    {
+        SortedArrayList<Node.Id> removed = 
node.topology().current().removedIds();
+        synchronized (this)
+        {
+            Invariants.require(state == State.INITIALIZED, "Expected state to 
be INITIALIZED but was %s", state);
+            state = State.STARTED;
+            previouslyRemovedIds = removed;
+            node.topology().addListener(watermarkCollector);

Review Comment:
   nit: probably can move these outside the lock? (not that this matters though)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to