ifesdjeen commented on code in PR #4464:
URL: https://github.com/apache/cassandra/pull/4464#discussion_r2508178394
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -414,38 +395,125 @@ public synchronized void startup()
{
if (state != State.INIT)
return;
- journal.start(node);
- node.load();
- ClusterMetadata metadata = ClusterMetadata.current();
- endpointMapper.updateMapping(metadata);
+ node.unsafeSetReplaying(true);
+ try
+ {
+ journal.start(node);
+ node.load();
+
+ ClusterMetadata metadata = ClusterMetadata.current();
+ endpointMapper.updateMapping(metadata);
+
+ List<TopologyUpdate> images = journal.replayTopologies();
+ if (!images.isEmpty())
+ {
+ // Initialise command stores using latest topology from the
log;
+ // if there are no local command stores, don't report any
topologies and simply fetch the latest known in the cluster
+ // this avoids a registered (not joined) node learning of
topologies, then later restarting with some intervening
+ // epochs having been garbage collected by the other nodes in
the cluster
+ TopologyUpdate last = images.get(images.size() - 1);
+ if (!last.commandStores.isEmpty())
+ {
+ node.commandStores().initializeTopologyUnsafe(last);
+
+ // Replay local epochs
+ for (TopologyUpdate image : images)
+ node.topology().reportTopology(image.global);
+ }
+ }
+ replayJournal(this);
+ }
+ finally
+ {
+ node.unsafeSetReplaying(false);
+ }
- List<TopologyUpdate> images = journal.replayTopologies();
- if (!images.isEmpty())
+ finishInitialization();
+ catchup();
+
+ fastPathCoordinator.start();
+
ClusterMetadataService.instance().log().addListener(fastPathCoordinator);
+
+
node.durability().shards().reconfigure(Ints.checkedCast(getAccordShardDurabilityTargetSplits()),
+
Ints.checkedCast(getAccordShardDurabilityMaxSplits()),
+
Ints.checkedCast(getAccordShardDurabilityCycle(SECONDS)), SECONDS);
+
node.durability().global().setGlobalCycleTime(Ints.checkedCast(getAccordGlobalDurabilityCycle(SECONDS)),
SECONDS);
+ // Only enable durability scheduling _after_ we have fully replayed
journal
+ node.durability().start();
+ state = State.STARTED;
+ }
+
+ void catchup()
Review Comment:
Maybe would make sense to disambiguate: we already have TCM catchup,
topology catchup, too.
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -414,38 +395,125 @@ public synchronized void startup()
{
if (state != State.INIT)
return;
- journal.start(node);
- node.load();
- ClusterMetadata metadata = ClusterMetadata.current();
- endpointMapper.updateMapping(metadata);
+ node.unsafeSetReplaying(true);
+ try
+ {
+ journal.start(node);
+ node.load();
+
+ ClusterMetadata metadata = ClusterMetadata.current();
+ endpointMapper.updateMapping(metadata);
+
+ List<TopologyUpdate> images = journal.replayTopologies();
+ if (!images.isEmpty())
+ {
+ // Initialise command stores using latest topology from the
log;
+ // if there are no local command stores, don't report any
topologies and simply fetch the latest known in the cluster
+ // this avoids a registered (not joined) node learning of
topologies, then later restarting with some intervening
+ // epochs having been garbage collected by the other nodes in
the cluster
+ TopologyUpdate last = images.get(images.size() - 1);
+ if (!last.commandStores.isEmpty())
+ {
+ node.commandStores().initializeTopologyUnsafe(last);
+
+ // Replay local epochs
+ for (TopologyUpdate image : images)
+ node.topology().reportTopology(image.global);
+ }
+ }
+ replayJournal(this);
+ }
+ finally
+ {
+ node.unsafeSetReplaying(false);
+ }
- List<TopologyUpdate> images = journal.replayTopologies();
- if (!images.isEmpty())
+ finishInitialization();
+ catchup();
+
+ fastPathCoordinator.start();
+
ClusterMetadataService.instance().log().addListener(fastPathCoordinator);
+
+
node.durability().shards().reconfigure(Ints.checkedCast(getAccordShardDurabilityTargetSplits()),
+
Ints.checkedCast(getAccordShardDurabilityMaxSplits()),
+
Ints.checkedCast(getAccordShardDurabilityCycle(SECONDS)), SECONDS);
+
node.durability().global().setGlobalCycleTime(Ints.checkedCast(getAccordGlobalDurabilityCycle(SECONDS)),
SECONDS);
+ // Only enable durability scheduling _after_ we have fully replayed
journal
+ node.durability().start();
+ state = State.STARTED;
+ }
+
+ void catchup()
+ {
+ AccordSpec spec = DatabaseDescriptor.getAccord();
+ if (!spec.catchup_on_start)
{
- // Initialise command stores using latest topology from the log;
- // if there are no local command stores, don't report any
topologies and simply fetch the latest known in the cluster
- // this avoids a registered (not joined) node learning of
topologies, then later restarting with some intervening
- // epochs having been garbage collected by the other nodes in the
cluster
- TopologyUpdate last = images.get(images.size() - 1);
- if (!last.commandStores.isEmpty())
+ logger.info("Not catching up with peers");
+ return;
+ }
+
+ BootstrapState bootstrapState = SystemKeyspace.getBootstrapState();
+ if (bootstrapState == COMPLETED)
+ {
+ long maxLatencyNanos =
spec.catchup_on_start_fail_latency.toNanoseconds();
+ int attempts = 1;
+ while (true)
{
- node.commandStores().initializeTopologyUnsafe(last);
+ logger.info("Catching up with quorum...");
+ long start = nanoTime();
+ long failAt = start + maxLatencyNanos;
+ Future<Void> f = toFuture(Catchup.catchup(node));
+ if (!f.awaitUntilThrowUncheckedOnInterrupt(failAt))
+ {
+ if (spec.catchup_on_start_exit_on_failure)
+ {
+ logger.error("Catch up exceeded maximum latency of
{}ns; shutting down", maxLatencyNanos);
+ throw new RuntimeException("Could not catch up with
peers");
+ }
+ logger.error("Catch up exceeded maximum latency of {}ns;
starting up", maxLatencyNanos);
+ break;
+ }
+
+ Throwable failed = f.cause();
+ if (failed != null)
+ throw new RuntimeException("Could not catch up with pers",
failed);
Review Comment:
nit: peers
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]