ifesdjeen commented on code in PR #4461:
URL: https://github.com/apache/cassandra/pull/4461#discussion_r2491909990
##########
.gitmodules:
##########
@@ -1,4 +1,4 @@
[submodule "modules/accord"]
path = modules/accord
- url = https://github.com/apache/cassandra-accord.git
- branch = trunk
+ url = https://github.com/belliottsmith/cassandra-accord.git
Review Comment:
Just to make sure you have it on your radar: private branch
##########
src/java/org/apache/cassandra/service/accord/AccordTopologyService.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import accord.api.TopologyListener;
+import accord.api.TopologyService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.topology.TopologyRetiredException;
+import accord.utils.Invariants;
+import accord.utils.SortedArrays.SortedArrayList;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults.SettableByCallback;
+import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.concurrent.Shutdownable;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessageDelivery;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.SharedContext;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Simulate;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.cassandra.utils.Simulate.With.MONITORS;
+
+@Simulate(with=MONITORS)
+public class AccordTopologyService implements TopologyService, Shutdownable,
TopologyListener
+{
+ public static final Logger logger =
LoggerFactory.getLogger(AccordTopologyService.class);
+
+ // TODO (expected): move syncPropagator and watermarkCollector out of this
class (and merge them)
+ private final AccordSyncPropagator syncPropagator;
+ private final WatermarkCollector watermarkCollector;
+
+ private SortedArrayList<Node.Id> previouslyRemovedIds =
SortedArrayList.ofSorted();
+
+ private enum State { INITIALIZED, STARTED, SHUTDOWN }
+
+ @GuardedBy("this")
+ private State state = State.INITIALIZED;
+
+ public AccordTopologyService(Node.Id node, AccordEndpointMapper
endpointMapper, MessageDelivery messagingService, ScheduledExecutorPlus
scheduledTasks)
+ {
+ this.syncPropagator = new AccordSyncPropagator(node, endpointMapper,
messagingService, scheduledTasks);
+ this.watermarkCollector = new WatermarkCollector();
+ }
+
+ public AccordTopologyService(Node.Id node, AccordEndpointMapper
endpointMapper)
+ {
+ this(node, endpointMapper, MessagingService.instance(),
ScheduledExecutors.scheduledTasks);
+ }
+
+ /**
+ * On restart, loads topologies. On bootstrap, discovers existing
topologies and initializes the node.
+ */
+ public void onStartup(Node node)
+ {
+ SortedArrayList<Node.Id> removed =
node.topology().current().removedIds();
+ synchronized (this)
+ {
+ Invariants.require(state == State.INITIALIZED, "Expected state to
be INITIALIZED but was %s", state);
+ state = State.STARTED;
+ previouslyRemovedIds = removed;
+ node.topology().addListener(watermarkCollector);
+ node.topology().addListener(syncPropagator);
+ }
+ syncPropagator.onNodesRemoved(removed);
+ }
+
+ @Override
+ public synchronized boolean isTerminated()
+ {
+ return state == State.SHUTDOWN;
+ }
+
+ @Override
+ public synchronized void shutdown()
+ {
+ if (isTerminated())
+ return;
+ state = State.SHUTDOWN;
+ }
+
+ @Override
+ public Object shutdownNow()
+ {
+ shutdown();
+ return null;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit units) throws
InterruptedException
+ {
+ return isTerminated();
+ }
+
+ @Override
+ public void onReceived(Topology topology)
+ {
+ SortedArrayList<Node.Id> newlyRemoved;
+ synchronized (this)
+ {
+ newlyRemoved = topology.removedIds().without(previouslyRemovedIds);
+ previouslyRemovedIds =
topology.removedIds().with(previouslyRemovedIds);
+ }
+ syncPropagator.onNodesRemoved(newlyRemoved);
+ }
+
+ @Override
+ public AsyncResult<Topology> fetchTopologyForEpoch(long epoch)
+ {
+ SettableByCallback<Topology> result = new SettableByCallback<>();
+ fetchTopologyAsync(epoch, result);
+ return result;
+ }
+
+ private void fetchTopologyAsync(long epoch, BiConsumer<? super Topology, ?
super Throwable> onResult)
+ {
+ // It's not safe for this to block on CMS so for now pick a thread
pool to handle it
+ Stage.ACCORD_MIGRATION.execute(() -> {
+ try
+ {
+ if (ClusterMetadata.current().epoch.getEpoch() < epoch)
Review Comment:
nit: can move `metadata` variable declaration from L167 and put it before
`try` to save a call, also useful for clarity that we always have just one
epoch here.
##########
src/java/org/apache/cassandra/service/accord/AccordTopologyService.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import accord.api.TopologyListener;
+import accord.api.TopologyService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.topology.TopologyRetiredException;
+import accord.utils.Invariants;
+import accord.utils.SortedArrays.SortedArrayList;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults.SettableByCallback;
+import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.concurrent.Shutdownable;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessageDelivery;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.SharedContext;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Simulate;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.cassandra.utils.Simulate.With.MONITORS;
+
+@Simulate(with=MONITORS)
+public class AccordTopologyService implements TopologyService, Shutdownable,
TopologyListener
+{
+ public static final Logger logger =
LoggerFactory.getLogger(AccordTopologyService.class);
+
+ // TODO (expected): move syncPropagator and watermarkCollector out of this
class (and merge them)
+ private final AccordSyncPropagator syncPropagator;
+ private final WatermarkCollector watermarkCollector;
+
+ private SortedArrayList<Node.Id> previouslyRemovedIds =
SortedArrayList.ofSorted();
+
+ private enum State { INITIALIZED, STARTED, SHUTDOWN }
+
+ @GuardedBy("this")
+ private State state = State.INITIALIZED;
+
+ public AccordTopologyService(Node.Id node, AccordEndpointMapper
endpointMapper, MessageDelivery messagingService, ScheduledExecutorPlus
scheduledTasks)
+ {
+ this.syncPropagator = new AccordSyncPropagator(node, endpointMapper,
messagingService, scheduledTasks);
+ this.watermarkCollector = new WatermarkCollector();
+ }
+
+ public AccordTopologyService(Node.Id node, AccordEndpointMapper
endpointMapper)
+ {
+ this(node, endpointMapper, MessagingService.instance(),
ScheduledExecutors.scheduledTasks);
+ }
+
+ /**
+ * On restart, loads topologies. On bootstrap, discovers existing
topologies and initializes the node.
+ */
+ public void onStartup(Node node)
+ {
+ SortedArrayList<Node.Id> removed =
node.topology().current().removedIds();
+ synchronized (this)
+ {
+ Invariants.require(state == State.INITIALIZED, "Expected state to
be INITIALIZED but was %s", state);
+ state = State.STARTED;
+ previouslyRemovedIds = removed;
+ node.topology().addListener(watermarkCollector);
+ node.topology().addListener(syncPropagator);
+ }
+ syncPropagator.onNodesRemoved(removed);
+ }
+
+ @Override
+ public synchronized boolean isTerminated()
+ {
+ return state == State.SHUTDOWN;
+ }
+
+ @Override
+ public synchronized void shutdown()
+ {
+ if (isTerminated())
+ return;
+ state = State.SHUTDOWN;
+ }
+
+ @Override
+ public Object shutdownNow()
+ {
+ shutdown();
+ return null;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit units) throws
InterruptedException
+ {
+ return isTerminated();
+ }
+
+ @Override
+ public void onReceived(Topology topology)
+ {
+ SortedArrayList<Node.Id> newlyRemoved;
+ synchronized (this)
+ {
+ newlyRemoved = topology.removedIds().without(previouslyRemovedIds);
+ previouslyRemovedIds =
topology.removedIds().with(previouslyRemovedIds);
+ }
+ syncPropagator.onNodesRemoved(newlyRemoved);
+ }
+
+ @Override
+ public AsyncResult<Topology> fetchTopologyForEpoch(long epoch)
+ {
+ SettableByCallback<Topology> result = new SettableByCallback<>();
+ fetchTopologyAsync(epoch, result);
+ return result;
+ }
+
+ private void fetchTopologyAsync(long epoch, BiConsumer<? super Topology, ?
super Throwable> onResult)
Review Comment:
previously, we would issue just one call per epoch, and I think this
behavior is useful, and we should probably preserve it, WDYT?
##########
test/unit/org/apache/cassandra/service/accord/AccordTopologyServiceTest.java:
##########
@@ -0,0 +1,297 @@
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
Review Comment:
Should we remove this one?
##########
src/java/org/apache/cassandra/service/accord/AccordTopologyService.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import accord.api.TopologyListener;
+import accord.api.TopologyService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.topology.TopologyRetiredException;
+import accord.utils.Invariants;
+import accord.utils.SortedArrays.SortedArrayList;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults.SettableByCallback;
+import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.concurrent.Shutdownable;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessageDelivery;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.SharedContext;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Simulate;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.cassandra.utils.Simulate.With.MONITORS;
+
+@Simulate(with=MONITORS)
+public class AccordTopologyService implements TopologyService, Shutdownable,
TopologyListener
+{
+ public static final Logger logger =
LoggerFactory.getLogger(AccordTopologyService.class);
+
+ // TODO (expected): move syncPropagator and watermarkCollector out of this
class (and merge them)
+ private final AccordSyncPropagator syncPropagator;
+ private final WatermarkCollector watermarkCollector;
+
+ private SortedArrayList<Node.Id> previouslyRemovedIds =
SortedArrayList.ofSorted();
+
+ private enum State { INITIALIZED, STARTED, SHUTDOWN }
+
+ @GuardedBy("this")
+ private State state = State.INITIALIZED;
+
+ public AccordTopologyService(Node.Id node, AccordEndpointMapper
endpointMapper, MessageDelivery messagingService, ScheduledExecutorPlus
scheduledTasks)
+ {
+ this.syncPropagator = new AccordSyncPropagator(node, endpointMapper,
messagingService, scheduledTasks);
+ this.watermarkCollector = new WatermarkCollector();
+ }
+
+ public AccordTopologyService(Node.Id node, AccordEndpointMapper
endpointMapper)
+ {
+ this(node, endpointMapper, MessagingService.instance(),
ScheduledExecutors.scheduledTasks);
+ }
+
+ /**
+ * On restart, loads topologies. On bootstrap, discovers existing
topologies and initializes the node.
+ */
+ public void onStartup(Node node)
+ {
+ SortedArrayList<Node.Id> removed =
node.topology().current().removedIds();
+ synchronized (this)
+ {
+ Invariants.require(state == State.INITIALIZED, "Expected state to
be INITIALIZED but was %s", state);
+ state = State.STARTED;
+ previouslyRemovedIds = removed;
+ node.topology().addListener(watermarkCollector);
Review Comment:
nit: probably can move these outside the lock? (not that this matters though)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]