ifesdjeen commented on code in PR #4460:
URL: https://github.com/apache/cassandra/pull/4460#discussion_r2485718399
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -288,8 +287,16 @@ public synchronized static AccordService startup(NodeId
tcmId)
AccordService as = new
AccordService(AccordTopology.tcmIdToAccord(tcmId));
unsafeInstance = as;
- as.startup();
- replayJournal(as);
+ as.node.unsafeSetReplaying(true);
+ try
+ {
+ as.startup();
Review Comment:
I don't think this should matter, but maybe it's worth to move `startup` out
from here? I think the issue was only with log replay.
##########
src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java:
##########
@@ -663,6 +676,97 @@ private static void addRow(PartitionsCollector collector,
int executorId, String
}
}
+ public static final class ListenersDepsTable extends
AbstractLazyVirtualTable
+ {
+ private ListenersDepsTable()
+ {
+ super(parse(VIRTUAL_ACCORD_DEBUG, LISTENERS_DEPS,
+ "Accord Txn Dependency Listeners",
+ "CREATE TABLE %s (\n" +
+ " command_store_id int,\n" +
+ " waiting_on 'TxnIdUtf8Type',\n" +
+ " waiting_until text,\n" +
+ " waiter 'TxnIdUtf8Type',\n" +
+ " PRIMARY KEY (command_store_id, waiting_on,
waiting_until, waiter)" +
+ ')', Int32Type.instance), FAIL, UNSORTED, ASC);
+ }
+
+ @Override
+ public void collect(PartitionsCollector collector)
+ {
+ AccordCommandStores stores = (AccordCommandStores)
AccordService.unsafeInstance().node().commandStores();
+ Object[] pk = collector.singlePartitionKey();
+ if (pk != null)
+ {
+ CommandStore commandStore = stores.forId((Integer)pk[0]);
+ if (commandStore != null)
+ addPartition(commandStore, collector);
+ return;
+ }
+ stores.forAllUnsafe(commandStore -> addPartition(commandStore,
collector));
+ }
+
+ private void addPartition(CommandStore commandStore,
PartitionsCollector collector)
+ {
+ collector.partition(commandStore.id())
+ .collect(rows -> {
+ // TODO (desired): support maybe execute immediately
with safeStore
+
AccordService.getBlocking(commandStore.chain((PreLoadContext.Empty)
metadata::toString, safeStore -> { addRows(safeStore, rows); }));
+ });
+ }
+
+ private void addRows(SafeCommandStore safeStore, RowsCollector rows)
+ {
+ LocalListeners listeners =
safeStore.commandStore().unsafeGetListeners();
+ for (LocalListeners.TxnListener listener :
listeners.txnListeners())
+ rows.add(listener.waitingOn.toString(),
listener.awaitingStatus.name(), listener.waiter.toString())
+ .eagerCollect(ignore -> {});
+ }
+ }
+
+ public static final class ListenersLocalTable extends
AbstractLazyVirtualTable
+ {
+ private ListenersLocalTable()
+ {
+ super(parse(VIRTUAL_ACCORD_DEBUG, LISTENERS_LOCAL,
+ "Accord Local Listeners",
+ "CREATE TABLE %s (\n" +
+ " command_store_id int,\n" +
+ " waiting_on 'TxnIdUtf8Type',\n" +
+ " waiter text,\n" +
+ " PRIMARY KEY (command_store_id, waiting_on, waiter)"
+
Review Comment:
just out of curiosity why make all three a primary key? easier to init in
`addPartition`?
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -322,23 +329,15 @@ public static boolean replayJournal(AccordService as)
{
logger.info("Starting journal replay.");
long before = Clock.Global.nanoTime();
- CommandsForKey.disableLinearizabilityViolationsReporting();
- try
- {
- if (as.journalConfiguration().replayMode() == RESET)
- AccordKeyspace.truncateCommandsForKey();
-
- as.node.commandStores().forAllUnsafe(cs ->
cs.unsafeProgressLog().stop());
- as.journal().replay(as.node().commandStores());
- logger.info("Waiting for command stores to quiesce.");
- ((AccordCommandStores)as.node.commandStores()).waitForQuiescense();
- as.journal.unsafeSetStarted();
- as.node.commandStores().forAllUnsafe(cs ->
cs.unsafeProgressLog().start());
- }
- finally
- {
- CommandsForKey.enableLinearizabilityViolationsReporting();
- }
+ if (as.journalConfiguration().replayMode() == RESET)
+ AccordKeyspace.truncateCommandsForKey();
+
+ as.node.commandStores().forAllUnsafe(cs ->
cs.unsafeProgressLog().stop());
+ as.journal().replay(as.node().commandStores());
+ logger.info("Waiting for command stores to quiesce.");
+ ((AccordCommandStores)as.node.commandStores()).waitForQuiescense();
Review Comment:
nit (also, preexisting): should be `Quiescen_c_e`
##########
src/java/org/apache/cassandra/service/accord/AccordFastPath.java:
##########
@@ -235,14 +235,13 @@ public Epoch lastModified()
return lastModified;
}
- public ImmutableSet<Node.Id> unavailableIds()
+ public SortedArrayList<Node.Id> unavailableIds()
Review Comment:
looks like both `info` and `status` on it are immutable; i.e. status of
unavailable can not change for the fast path configuration; should we cache
this sorted list?
##########
src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java:
##########
@@ -1770,6 +1775,66 @@ private void cleanup(TxnId txnId, int commandStoreId,
Cleanup cleanup)
}
}
+ /**
+ * Write-only virtual table for updating Accord node states
+ */
+ public static final class NodeOpsTable extends
AbstractMutableLazyVirtualTable
+ {
+ // TODO (expected): test each of these operations
+ enum NodeOp
+ {
+ MARK_STALE("Mark the node as offline for an indeterminate period.
Peers will be allowed to garbage collect without involving the marked node, and
when it comes online it will need to rejoin the transaction log."),
+ UNMARK_STALE("Peers will no longer be allowed to garbage collect
without involving the node."),
+ MARK_HARD_REMOVED("EMERGENCY USE ONLY: Mark the node as
PERMANENTLY offline. It must be guaranteed not to respond at any future date.
If a quorum is marked HARD_REMOVED, the remaining replicas in a shard may
collectively make progress, though consistency violations are possible."),
+ ;
+
+ final String description;
+
+ NodeOp(String description)
+ {
+ this.description = description;
+ }
+ }
+ private NodeOpsTable()
+ {
+ super(parse(VIRTUAL_ACCORD_DEBUG, NODE_OPS,
+ "Update Accord Node State",
+ "CREATE TABLE %s (\n" +
+ " node_id int,\n" +
+ " op text," +
+ " PRIMARY KEY (node_id)" +
+ ')', Int32Type.instance), FAIL, UNSORTED);
+ }
+
+ @Override
+ protected void collect(PartitionsCollector collector)
+ {
+ throw new UnsupportedOperationException(NODE_OPS + " is a
write-only table");
+ }
+
+ @Override
+ protected void applyRowUpdate(Object[] partitionKeys, Object[]
clusteringKeys, ColumnMetadata[] columns, Object[] values)
+ {
+ NodeId nodeId = new NodeId((Integer) partitionKeys[0]);
+ Set<NodeId> nodeIds = Collections.singleton(nodeId);
+ NodeOp op = tryParse(values[0], true, NodeOp.class,
NodeOp::valueOf);
+
+ switch (op)
+ {
+ default: throw new UnhandledEnum(op);
+ case MARK_STALE:
+ AccordOperations.instance.accordMarkStale(nodeIds);
Review Comment:
Should we maybe add a check for node id existence?
##########
src/java/org/apache/cassandra/tcm/Startup.java:
##########
@@ -447,7 +447,15 @@ public static void startup(Supplier<Transformation>
initialTransformation, boole
}
case JOINED:
if (StorageService.isReplacingSameAddress())
+ {
+ if (DatabaseDescriptor.getAccordTransactionsEnabled())
+ {
+ // we need to support a mode that changes the NodeId
when replacing the same address for accord transaction safety
+ throw new IllegalStateException("Cannot replace same
address when accord transactions are enabled.");
Review Comment:
This is a pretty big issue (even if fixing it might be just remapping ID in
TCM, but need to check implications of this action), maybe worth marking it
with `TODO (required)`.
##########
src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java:
##########
@@ -2014,6 +2118,128 @@ protected void collect(PartitionsCollector collector)
}
}
+ // TODO (desired): epoch/token filtering
+ // TODO (expected): report command_store_ids for each shard
+ public static final class ShardEpochsTable extends AbstractLazyVirtualTable
+ {
+ static class ShardAndEpochs
+ {
+ final long startEpoch;
+ final Shard shard;
+ long endEpoch;
+
+ ShardAndEpochs(long startEpoch, Shard shard)
+ {
+ this.startEpoch = this.endEpoch = startEpoch;
+ this.shard = shard;
+ }
+ }
+
+ private ShardEpochsTable()
+ {
+ super(parse(VIRTUAL_ACCORD_DEBUG, SHARD_EPOCHS,
+ "Accord per-CommandStore RedundantBefore State",
+ "CREATE TABLE %s (\n" +
+ " table_id text,\n" +
+ " token_start 'TokenUtf8Type',\n" +
+ " epoch_start bigint,\n" +
+ " epoch_end bigint,\n" +
+ " peers text,\n" +
+ " peers_fast_path text,\n" +
+ " peers_hard_removed text,\n" +
+ " quorum_simple int,\n" +
+ " quorum_fast int,\n" +
+ " quorum_fast_privileged_deps int,\n" +
+ " quorum_fast_privileged_nodeps int,\n" +
+ " token_end 'TokenUtf8Type',\n" +
+ " PRIMARY KEY (table_id, token_start, epoch_start)" +
+ ')', Int32Type.instance), FAIL, ASC);
Review Comment:
Should this be utf8 type here?
##########
src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java:
##########
@@ -2014,6 +2118,128 @@ protected void collect(PartitionsCollector collector)
}
}
+ // TODO (desired): epoch/token filtering
+ // TODO (expected): report command_store_ids for each shard
+ public static final class ShardEpochsTable extends AbstractLazyVirtualTable
+ {
+ static class ShardAndEpochs
+ {
+ final long startEpoch;
+ final Shard shard;
+ long endEpoch;
+
+ ShardAndEpochs(long startEpoch, Shard shard)
+ {
+ this.startEpoch = this.endEpoch = startEpoch;
+ this.shard = shard;
+ }
+ }
+
+ private ShardEpochsTable()
+ {
+ super(parse(VIRTUAL_ACCORD_DEBUG, SHARD_EPOCHS,
+ "Accord per-CommandStore RedundantBefore State",
Review Comment:
nit: Shard Epochs, maybe was copy-pasted?
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -471,7 +458,32 @@ public void finishInitialization()
TopologyRange remote = fetchTopologies(highestKnown + 1);
if (remote != null) // TODO (required): if remote.min >
highestKnown + 1, should we decide if we need to truncate our local topologies?
Probably not until startup has finished.
+ {
remote.forEach(configService::reportTopology, remote.min,
Integer.MAX_VALUE);
+ if (remote.current > highestKnown)
+ highestKnown = remote.current;
+ }
+
+ // Subscribe to TCM events, and collect any we may have missed to
report now
+ ChangeListener prevListener =
MetadataChangeListener.instance.collector.getAndSet(new ChangeListener()
+ {
+ @Override
+ public void notifyPostCommit(ClusterMetadata prev,
ClusterMetadata next, boolean fromSnapshot)
+ {
+ if (state != State.SHUTDOWN)
+ configService.maybeReportMetadata(next);
+ }
+ });
+
+ Invariants.require((prevListener instanceof
MetadataChangeListener.PreInitStateCollector),
+ "Listener should have been initialized with
Accord pre-init state collector, but was " + prevListener.getClass());
+
+ MetadataChangeListener.PreInitStateCollector preinit =
(MetadataChangeListener.PreInitStateCollector) prevListener;
Review Comment:
Would it make sense to leave this in "local" initialization? Should not
matter much, but this addition was only to preclude race conditions during
startup, in case any unknown epochs are reported.
##########
src/java/org/apache/cassandra/service/accord/AccordFetchCoordinator.java:
##########
@@ -430,10 +444,12 @@ protected synchronized void onReadOk(Node.Id from,
CommandStore commandStore, Da
streamData.streams.forEach((range, streamInfo) -> {
if (streamInfo.hasData)
{
+ logger.info("StreamPlan {} created for bootstrap of {} from
{}", streamInfo.planId, range, from);
stream(streamInfo.planId).rangeReceived(range, from);
}
else
{
+ logger.info("StreamPlan {} created for bootstrap of {} from {}
with no data to stream; succeeding immediately.", streamInfo.planId, range,
from);
Review Comment:
just to make sure: was this meant to be `info`?
##########
src/java/org/apache/cassandra/service/accord/AccordSyncPropagator.java:
##########
@@ -321,6 +321,13 @@ private boolean notify(Node.Id to, Notification
notification)
switch (nodeStatus)
{
default: throw new UnhandledEnum(nodeStatus);
+ case UNHEALTHY:
+ if (!endpointMapper.isRemoved(to))
Review Comment:
could you mention that fall-through here is intentional?
##########
src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java:
##########
@@ -190,15 +186,9 @@ public synchronized void start()
{
Invariants.require(state == State.INITIALIZED, "Expected state to be
INITIALIZED but was %s", state);
state = State.STARTED;
-
- if (removedNodes != null)
- {
- // for all nodes removed, or pending removal, mark them as
removed, so we don't wait on their replies
- removedNodes.forEach(removed -> {
- onNodeRemoved(removed.removedIn.getEpoch(), currentTopology(),
new Node.Id(removed.id.id()));
- });
- removedNodes = null;
- }
+ endpointMapper.removedNodes().forEach((removed, removedIn) -> {
+ onNodeRemoved(removedIn, new Node.Id(removed.id));
Review Comment:
nit: `removed` is already a node id, should we just pass it along?
##########
src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java:
##########
@@ -1850,11 +1859,103 @@ protected void applyRowUpdate(Object[] partitionKeys,
Object[] clusteringKeys, C
}
}
- public static class TxnBlockedByTable extends AbstractLazyVirtualTable
+
+ /**
+ * Write-only virtual table for updating Accord node states
+ */
+ public static final class CommandStoreOpsTable extends
AbstractMutableLazyVirtualTable
{
- enum Reason
- {Self, Txn, Key}
+ // TODO (expected): test each of these operations
+ enum CommandStoreOp
+ {
+ SET_PROGRESS_LOG_MODE("Set the specified progress log mode."),
+ UNSET_PROGRESS_LOG_MODE("Unset the specified progress log mode."),
+ ;
+
+ final String description;
+ CommandStoreOp(String description)
+ {
+ this.description = description;
+ }
+ }
+ private CommandStoreOpsTable()
+ {
+ super(parse(VIRTUAL_ACCORD_DEBUG, COMMAND_STORE_OPS,
+ "Update Accord CommandStore State",
+ "CREATE TABLE %s (\n" +
+ " command_store_id int,\n" +
+ " op text," +
+ " param text," +
+ " PRIMARY KEY (command_store_id)" +
+ ')', Int32Type.instance), FAIL, UNSORTED);
+ }
+
+ @Override
+ protected void collect(PartitionsCollector collector)
+ {
+ throw new UnsupportedOperationException(COMMAND_STORE_OPS + " is a
write-only table");
+ }
+
+ @Override
+ protected void applyRowUpdate(Object[] partitionKeys, Object[]
clusteringKeys, ColumnMetadata[] columns, Object[] values)
+ {
+ int commandStoreId = (Integer) partitionKeys[0];
+
+ CommandStoreOp op = null;
+ String param = null;
+ for (int i = 0 ; i < columns.length ; ++i)
+ {
+ switch (columns[i].name.toString())
+ {
+ default: throw new IllegalArgumentException("Unknown
column " + columns[i].name.toString());
+ case "op":
+ op = tryParse(values[i], true, CommandStoreOp.class,
CommandStoreOp::valueOf);
+ break;
+ case "param":
+ param = (String) values[i];
+ break;
+ }
+ }
+
+ if (op == null)
+ throw new IllegalArgumentException("Must specify 'op'");
+
+ switch (op)
+ {
+ default: throw new UnhandledEnum(op);
+ case SET_PROGRESS_LOG_MODE:
+ case UNSET_PROGRESS_LOG_MODE:
+ if (param == null)
+ throw new IllegalArgumentException("Must specify
'param' for " + op);
+ ModeFlag mode = tryParse(param, true, ModeFlag.class,
ModeFlag::valueOf);
+ boolean set = op == CommandStoreOp.SET_PROGRESS_LOG_MODE;
+ if (commandStoreId < 0)
+ {
+ AccordService.instance().node()
+ .commandStores()
+ .forAllUnsafe(commandStore ->
updateMode(set, mode, commandStore));
+ }
+ else
+ {
+ updateMode(set, mode,
AccordService.instance().node().commandStores().forId(commandStoreId));
+ }
+ break;
+ }
+ }
+
+ private static void updateMode(boolean set, ModeFlag mode,
CommandStore commandStore)
+ {
+ DefaultProgressLog progressLog =
((DefaultProgressLog)commandStore.unsafeProgressLog());
+ if (set) progressLog.setMode(mode);
+ else progressLog.unsetMode(mode);
+ }
+ }
+
Review Comment:
nit: extra newlines?
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -471,7 +458,32 @@ public void finishInitialization()
TopologyRange remote = fetchTopologies(highestKnown + 1);
if (remote != null) // TODO (required): if remote.min >
highestKnown + 1, should we decide if we need to truncate our local topologies?
Probably not until startup has finished.
+ {
remote.forEach(configService::reportTopology, remote.min,
Integer.MAX_VALUE);
+ if (remote.current > highestKnown)
+ highestKnown = remote.current;
+ }
+
+ // Subscribe to TCM events, and collect any we may have missed to
report now
+ ChangeListener prevListener =
MetadataChangeListener.instance.collector.getAndSet(new ChangeListener()
+ {
+ @Override
+ public void notifyPostCommit(ClusterMetadata prev,
ClusterMetadata next, boolean fromSnapshot)
+ {
+ if (state != State.SHUTDOWN)
+ configService.maybeReportMetadata(next);
+ }
+ });
+
+ Invariants.require((prevListener instanceof
MetadataChangeListener.PreInitStateCollector),
+ "Listener should have been initialized with
Accord pre-init state collector, but was " + prevListener.getClass());
+
+ MetadataChangeListener.PreInitStateCollector preinit =
(MetadataChangeListener.PreInitStateCollector) prevListener;
Review Comment:
that said, with a proper reorder buffer it should not matter terribly:
important is to establish the "smallest knowable epoch" which I think we do
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]