sanpwc commented on code in PR #1815:
URL: https://github.com/apache/ignite-3/pull/1815#discussion_r1148908575
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java:
##########
@@ -256,6 +261,17 @@ private CompletableFuture<RaftGroupService>
startRaftGroupServiceInternal(
);
}
+
+ /**
+ * Gets a future that completes when all committed updates are applied to
state machine.
Review Comment:
Here and there. I'd add that it's only about node startup process.
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java:
##########
@@ -123,6 +124,14 @@ public synchronized Node start() {
return this.node;
}
+ /**
+ * Gets a future to apply committed revisions.
Review Comment:
Here and there, why it's 'to'?
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java:
##########
@@ -1116,38 +1106,53 @@ public void onApplied( long lastAppliedLogIndex) {
return false;
}
- // set state to follower
- this.state = State.STATE_FOLLOWER;
+ logApplyComplition.whenComplete((committedIdx, err) -> {
+ if (err != null) {
+ LOG.error("Fail to apply committed updates.", err);
+ }
- if (LOG.isInfoEnabled()) {
- LOG.info("Node {} init, term={}, lastLogId={}, conf={},
oldConf={}.", getNodeId(), this.currTerm,
- this.logManager.getLastLogId(false), this.conf.getConf(),
this.conf.getOldConf());
- }
+ // set state to follower
+ this.state = State.STATE_FOLLOWER;
- if (this.snapshotExecutor != null &&
this.options.getSnapshotIntervalSecs() > 0) {
- LOG.debug("Node {} start snapshot timer, term={}.", getNodeId(),
this.currTerm);
- this.snapshotTimer.start();
- }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Node {} init, term={}, lastLogId={}, conf={},
oldConf={}.", getNodeId(), this.currTerm,
+ this.logManager.getLastLogId(false), this.conf.getConf(),
this.conf.getOldConf());
+ }
- if (!this.conf.isEmpty()) {
- stepDown(this.currTerm, false, new Status());
- }
+ if (this.snapshotExecutor != null &&
this.options.getSnapshotIntervalSecs() > 0) {
+ LOG.debug("Node {} start snapshot timer, term={}.",
getNodeId(), this.currTerm);
+ this.snapshotTimer.start();
+ }
- // Now the raft node is started , have to acquire the writeLock to
avoid race
- // conditions
- this.writeLock.lock();
- if (this.conf.isStable() && this.conf.getConf().size() == 1 &&
this.conf.getConf().contains(this.serverId)) {
- // The group contains only this server which must be the LEADER,
trigger
- // the timer immediately.
- electSelf();
- }
- else {
- this.writeLock.unlock();
- }
+ if (!this.conf.isEmpty()) {
+ stepDown(this.currTerm, false, new Status());
+ }
+
+ // Now the raft node is started , have to acquire the writeLock to
avoid race
+ // conditions
+ this.writeLock.lock();
+ if (this.conf.isStable() && this.conf.getConf().size() == 1 &&
this.conf.getConf().contains(this.serverId)) {
+ // The group contains only this server which must be the
LEADER, trigger
+ // the timer immediately.
+ electSelf();
+ }
+ else {
+ this.writeLock.unlock();
+ }
+
+ applyCommittedFuture.complete(commitIdx);
Review Comment:
Should we compete the future exceptionally in case of any error? Seems that
otherwise we may hang forever.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java:
##########
@@ -457,6 +457,14 @@ public boolean startRaftNode(
}
}
+ @Override
+ public CompletableFuture<Long> raftNodeReadyFuture(ReplicationGroupId
groupId) {
+ RaftGroupService jraftNode = nodes.entrySet().stream().filter(entry ->
entry.getKey().groupId().equals(groupId))
Review Comment:
Are you sure that there will only one node with given groupId. Seems that it
should be true for AI, but not for GG because of learners. Could you please
create ticket for GG if it's possible to have multiple nodes there?
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java:
##########
@@ -57,7 +58,7 @@ public class RaftGroupService {
/**
* The raft node.
*/
- private Node node;
+ private NodeImpl node;
Review Comment:
Why?
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java:
##########
@@ -1082,20 +1084,8 @@ public void onApplied( long lastAppliedLogIndex) {
fsmCaller.addLastAppliedLogIndexListener(lnsr);
fsmCaller.onCommitted(commitIdx);
-
- try {
- if (!externalAwaitStorageLatch) {
- applyCommitLatch.await();
- }
- } catch (InterruptedException e) {
- LOG.error("Fail to apply committed updates.", e);
-
- return false;
- }
} else {
- if (externalAwaitStorageLatch) {
- opts.getStorageReadyLatch().countDown();
- }
+ logApplyComplition.complete(0L);
Review Comment:
Why it's 0? Despite the fact that there were no pending records in log,
storage may have it's own checkpointed state with non-zero applied index. BTW,
how do we use the revision, the one that is used to compete the future?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]