ifesdjeen commented on code in PR #3416:
URL: https://github.com/apache/cassandra/pull/3416#discussion_r1679571554
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -688,6 +866,182 @@ public Id nodeId()
return node.id();
}
+ @Nullable
+ @Override
+ public Long minEpoch(Collection<TokenRange> ranges)
+ {
+ return node.topology().minEpoch();
+// TxnId txnId = node.nextTxnId(Kind.ExclusiveSyncPoint,
Routable.Domain.Range);
Review Comment:
Nit: this probably should be removed (i.e. commented code), but I'm missing
context to understand why it might have been needed.
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -358,6 +464,79 @@ public synchronized void startup()
state = State.STARTED;
}
+ private static boolean isSyncComplete(Ranges ranges)
+ {
+ for (Range range : ranges)
+ {
+ TokenRange tr = (TokenRange) range;
+ if (!tr.isFullRange())
+ return false;
+ }
+ return true;
+ }
+
+ private List<ClusterMetadata> discoverHistoric(Node node,
ClusterMetadataService cms, OptionalLong optMaxEpoch)
+ {
+ ClusterMetadata current = cms.metadata();
+ Topology topology = AccordTopology.createAccordTopology(current);
+ Ranges localRanges = topology.rangesForNode(node.id());
+ if (!localRanges.isEmpty()) // already joined, nothing to see here
+ return Collections.emptyList();
+
+ Map<InetAddressAndPort, Set<TokenRange>> peers = new HashMap<>();
+ for (KeyspaceMetadata keyspace : current.schema.getKeyspaces())
+ {
+ List<TableMetadata> tables =
keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList());
+ if (tables.isEmpty())
+ continue;
+ DataPlacement placement =
current.placements.get(keyspace.params.replication);
+ DataPlacement whenSettled =
current.writePlacementAllSettled(keyspace);
+ Sets.SetView<InetAddressAndPort> alive =
Sets.intersection(whenSettled.writes.byEndpoint().keySet(),
placement.writes.byEndpoint().keySet());
+ InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
+ whenSettled.writes.forEach((range, group) -> {
+ if (group.endpoints().contains(self))
+ {
+ for (InetAddressAndPort peer : group.endpoints())
+ {
+ if (!alive.contains(peer)) continue;
+ for (TableMetadata table : tables)
+ peers.computeIfAbsent(peer, i -> new
HashSet<>()).add(AccordTopology.fullRange(table.id));
+ }
+ }
+ });
+ }
+ if (peers.isEmpty())
+ return Collections.emptyList();
+
+ Long minEpoch = findMinEpoch(MessagingService.instance(), peers,
optMaxEpoch);
+ if (minEpoch == null)
+ return Collections.emptyList();
+ List<ClusterMetadata> history = new
ArrayList<>(Math.toIntExact(current.epoch.getEpoch() - minEpoch));
+ for (long epoch = minEpoch; epoch < current.epoch.getEpoch(); epoch++)
+ history.add(cms.loadHistoricEpoch(Epoch.create(epoch)));
Review Comment:
This approach will only be able to reconstruct epochs from the local log. If
we want an arbitrary epoch - we will need to go to the CMS node and ask it to
reconstruct it from the distributed log.
There unfortunately be extremely rare cases where there's an epoch gap (such
as during disaster recovery), too.
##########
test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java:
##########
@@ -26,140 +26,69 @@
import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
+//TODO (now, correctness): one network changes the port where as multiple
network changes the address... yet AbstractNodeProvisionStrategy always changes
the port too!
@Shared(inner = INTERFACES)
public interface INodeProvisionStrategy
{
- enum Strategy
+ @FunctionalInterface
+ interface Factory
+ {
+ INodeProvisionStrategy create(int subnet, @Nullable Map<String,
Integer> portMap);
+ }
+ enum Strategy implements Factory
{
OneNetworkInterface
{
@Override
- INodeProvisionStrategy create(int subnet, @Nullable Map<String,
Integer> portMap)
+ public INodeProvisionStrategy create(int subnet, @Nullable
Map<String, Integer> portMap)
{
String ipAdress = "127.0." + subnet + ".1";
- return new INodeProvisionStrategy()
+ return new AbstractNodeProvisionStrategy(portMap)
{
- @Override
- public String seedIp()
- {
- return ipAdress;
- }
@Override
- public int seedPort()
+ public int seedNodeNum()
{
- return storagePort(1);
+ return 1;
}
@Override
public String ipAddress(int nodeNum)
{
return ipAdress;
}
-
- @Override
- public int storagePort(int nodeNum)
- {
- if (portMap != null)
- {
- return portMap.computeIfAbsent("storagePort@node"
+ nodeNum, key -> SocketUtils.findAvailablePort(seedIp(), 7011 + nodeNum));
- }
- return 7011 + nodeNum;
- }
-
- @Override
- public int nativeTransportPort(int nodeNum)
- {
- if (portMap != null)
- {
- return
portMap.computeIfAbsent("nativeTransportPort@node" + nodeNum, key ->
SocketUtils.findAvailablePort(seedIp(), 9041 + nodeNum));
- }
- return 9041 + nodeNum;
- }
-
- @Override
- public int jmxPort(int nodeNum)
- {
- if (portMap != null)
- {
- return portMap.computeIfAbsent("jmxPort@node" +
nodeNum, key -> SocketUtils.findAvailablePort(seedIp(), 7199 + nodeNum));
- }
- return 7199 + nodeNum;
- }
};
}
},
MultipleNetworkInterfaces
{
@Override
- INodeProvisionStrategy create(int subnet, @Nullable Map<String,
Integer> portMap)
+ public INodeProvisionStrategy create(int subnet, @Nullable
Map<String, Integer> portMap)
{
String ipPrefix = "127.0." + subnet + '.';
- return new INodeProvisionStrategy()
+ return new AbstractNodeProvisionStrategy(portMap)
{
@Override
- public String seedIp()
- {
- return ipPrefix + '1';
- }
-
- @Override
- public int seedPort()
+ public int seedNodeNum()
{
- return storagePort(1);
+ return 1;
}
@Override
public String ipAddress(int nodeNum)
{
return ipPrefix + nodeNum;
}
-
- @Override
- public int storagePort(int nodeNum)
- {
- if (portMap != null)
- {
- return portMap.computeIfAbsent("storagePort@node"
+ nodeNum, key -> SocketUtils.findAvailablePort(ipAddress(nodeNum), 7012));
- }
- return 7012;
- }
-
- @Override
- public int nativeTransportPort(int nodeNum)
- {
- if (portMap != null)
- {
- return
portMap.computeIfAbsent("nativeTransportPort@node" + nodeNum, key ->
SocketUtils.findAvailablePort(ipAddress(nodeNum), 9042));
- }
- return 9042;
- }
-
- @Override
- public int jmxPort(int nodeNum)
- {
- if (portMap != null)
- {
- return portMap.computeIfAbsent("jmxPort@node" +
nodeNum, key -> SocketUtils.findAvailablePort(ipAddress(nodeNum), 7199));
- }
- return 7199;
- }
};
}
};
-
- INodeProvisionStrategy create(int subnet)
- {
- return create(subnet, null);
- }
-
- abstract INodeProvisionStrategy create(int subnet, @Nullable
Map<String, Integer> portMap);
}
- String seedIp();
-
- int seedPort();
+// String seedIp();
Review Comment:
Nit: cleanup?
##########
src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java:
##########
@@ -246,28 +247,27 @@ private <O> O mapReduceForRange(Routables<?>
keysOrRanges, Ranges slice, BiFunct
{
if (commandsForRanges == null)
return accumulate;
+ CommandsForRanges cfr = commandsForRanges.current().slice(slice);
Review Comment:
Should we do the same in `mapReduceForKey`? There we also have
```
for (Key key : commandsForKeys.keySet())
```
##########
src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageDelivery;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.FutureCombiner;
+
+public class FetchMinEpoch
+{
+ public static final IVersionedSerializer<FetchMinEpoch> serializer = new
IVersionedSerializer<>()
+ {
+
+ @Override
+ public void serialize(FetchMinEpoch t, DataOutputPlus out, int
version) throws IOException
+ {
+ out.writeUnsignedVInt32(t.ranges.size());
+ for (TokenRange range : t.ranges)
+ TokenRange.serializer.serialize(range, out, version);
+ }
+
+ @Override
+ public FetchMinEpoch deserialize(DataInputPlus in, int version) throws
IOException
+ {
+ int size = in.readUnsignedVInt32();
+ List<TokenRange> ranges = new ArrayList<>(size);
+ for (int i = 0; i < size; i++)
+ ranges.add(TokenRange.serializer.deserialize(in, version));
+ return new FetchMinEpoch(ranges);
+ }
+
+ @Override
+ public long serializedSize(FetchMinEpoch t, int version)
+ {
+ int size = TypeSizes.sizeofUnsignedVInt(t.ranges.size());
+ for (TokenRange range : t.ranges)
+ size += TokenRange.serializer.serializedSize(range, version);
+ return size;
+ }
+ };
+ public static final IVerbHandler<FetchMinEpoch> handler = new
IVerbHandler<FetchMinEpoch>()
+ {
+ @Override
+ public void doVerb(Message<FetchMinEpoch> message) throws IOException
+ {
+ Long epoch =
AccordService.instance().minEpoch(message.payload.ranges);
+ MessagingService.instance().respond(new Response(epoch), message);
+ }
+ };
+ private final Collection<TokenRange> ranges;
+
+ public FetchMinEpoch(Collection<TokenRange> ranges)
+ {
+ this.ranges = ranges;
+ }
+
+ public static Future<Long> fetch(MessageDelivery messaging,
Map<InetAddressAndPort, Set<TokenRange>> peers)
Review Comment:
Just to make sure: do we want to just ignore non-successes? Or do we want to
retry on failure? I'd say the latter. There's a retry function in TCM,
`sendWithCallbackAsync` which can be useful.
##########
src/java/org/apache/cassandra/tcm/Startup.java:
##########
@@ -365,21 +367,29 @@ public static void startup(Supplier<Transformation>
initialTransformation, boole
{
ClusterMetadata metadata = ClusterMetadata.current();
NodeId self = metadata.myNodeId();
- AccordService.startup(self);
// finish in-progress sequences first
InProgressSequences.finishInProgressSequences(self);
metadata = ClusterMetadata.current();
- switch (metadata.directory.peerState(self))
+ NodeState startingstate = metadata.directory.peerState(self);
+ switch (startingstate)
+ {
+ case REGISTERED:
+ case LEFT:
+ break;
+ default:
+ AccordService.startup(self);
+ }
+ switch (startingstate)
Review Comment:
I'm a bit confused by this change. Above we're exiting on `REGISTERED` and
`LEFT` and then we go back to the `switch` where we do startup on those. It
feels like the intention was to only leave the below version in, could you
check this out?
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -358,6 +464,79 @@ public synchronized void startup()
state = State.STARTED;
}
+ private static boolean isSyncComplete(Ranges ranges)
+ {
+ for (Range range : ranges)
+ {
+ TokenRange tr = (TokenRange) range;
+ if (!tr.isFullRange())
+ return false;
+ }
+ return true;
+ }
+
+ private List<ClusterMetadata> discoverHistoric(Node node,
ClusterMetadataService cms, OptionalLong optMaxEpoch)
+ {
+ ClusterMetadata current = cms.metadata();
+ Topology topology = AccordTopology.createAccordTopology(current);
+ Ranges localRanges = topology.rangesForNode(node.id());
+ if (!localRanges.isEmpty()) // already joined, nothing to see here
+ return Collections.emptyList();
+
+ Map<InetAddressAndPort, Set<TokenRange>> peers = new HashMap<>();
+ for (KeyspaceMetadata keyspace : current.schema.getKeyspaces())
+ {
+ List<TableMetadata> tables =
keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList());
+ if (tables.isEmpty())
+ continue;
+ DataPlacement placement =
current.placements.get(keyspace.params.replication);
Review Comment:
I _think_ if you are only looking at write placements, you do not need the
`whenSettled`, since we first expand, then remove from read, then remove old.
##########
src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageDelivery;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.FutureCombiner;
+
+public class FetchMinEpoch
+{
+ public static final IVersionedSerializer<FetchMinEpoch> serializer = new
IVersionedSerializer<>()
+ {
+
+ @Override
+ public void serialize(FetchMinEpoch t, DataOutputPlus out, int
version) throws IOException
+ {
+ out.writeUnsignedVInt32(t.ranges.size());
+ for (TokenRange range : t.ranges)
+ TokenRange.serializer.serialize(range, out, version);
+ }
+
+ @Override
+ public FetchMinEpoch deserialize(DataInputPlus in, int version) throws
IOException
+ {
+ int size = in.readUnsignedVInt32();
+ List<TokenRange> ranges = new ArrayList<>(size);
+ for (int i = 0; i < size; i++)
+ ranges.add(TokenRange.serializer.deserialize(in, version));
+ return new FetchMinEpoch(ranges);
+ }
+
+ @Override
+ public long serializedSize(FetchMinEpoch t, int version)
+ {
+ int size = TypeSizes.sizeofUnsignedVInt(t.ranges.size());
Review Comment:
nit: should we use `long` for consistency with a return value?
##########
src/java/org/apache/cassandra/service/accord/TokenRange.java:
##########
@@ -48,13 +48,28 @@ public TokenRange(AccordRoutingKey start, AccordRoutingKey
end)
public TableId table()
{
- return ((AccordRoutingKey) start()).table();
+ return getStart().table();
+ }
+
+ public AccordRoutingKey getStart()
Review Comment:
It feels like distinction between `start` and `getStart` is getting very
blurry. I suggest to make `start` method non-final and downcast to
`AccordRoutingKey` here in override, keeping `start`/`end` names
##########
src/java/org/apache/cassandra/service/accord/AccordTopology.java:
##########
@@ -134,22 +138,34 @@ private static KeyspaceShard forRange(KeyspaceMetadata
keyspace, Range<Token> ra
.map(directory::peerId)
.map(AccordTopology::tcmIdToAccord)
.collect(Collectors.toSet());
+ Set<Node.Id> leaving = allLeaving.isEmpty() ?
+ Collections.emptySet() :
+ allLeaving.stream()
+ .filter(e ->
writeEndpoints.contains(e))
+ .map(directory::peerId)
+
.map(AccordTopology::tcmIdToAccord)
+ .collect(Collectors.toSet());
- return new KeyspaceShard(keyspace, range, nodes, pending);
+ return new KeyspaceShard(keyspace, range, nodes, pending, leaving);
}
- public static List<KeyspaceShard> forKeyspace(KeyspaceMetadata
keyspace, DataPlacements placements, Directory directory, ShardLookup lookup)
+ public static List<KeyspaceShard> forKeyspace(KeyspaceMetadata
keyspace,
+
Function<KeyspaceMetadata, DataPlacement> placements,
+
Function<KeyspaceMetadata, DataPlacement> settledPlacements,
+ Directory directory,
ShardLookup lookup)
{
- ReplicationParams replication = keyspace.params.replication;
- DataPlacement placement = placements.get(replication);
+// ReplicationParams replication = keyspace.params.replication;
+ DataPlacement placement = placements.apply(keyspace);
+ DataPlacement settled = settledPlacements.apply(keyspace);
+ Set<InetAddressAndPort> leaving =
Sets.difference(placement.writes.byEndpoint().keySet(),
settled.writes.byEndpoint().keySet());
Review Comment:
FWIW, we could integrate more closely and have `AccordTopology` inside TCM.
I somehow thought this also was a plan at some point.
##########
test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java:
##########
@@ -1095,7 +1096,27 @@ public void close()
.filter(i -> !i.isShutdown())
.map(IInstance::shutdown)
.collect(Collectors.toList());
- FBUtilities.waitOnFutures(futures,1L, TimeUnit.MINUTES);
+ try
+ {
+ FBUtilities.waitOnFutures(futures,1L, TimeUnit.MINUTES);
+ }
+ catch (RuntimeException t)
+ {
+ if (t.getCause() instanceof TimeoutException)
Review Comment:
Super useful! Great idea.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]