ibessonov commented on code in PR #1263:
URL: https://github.com/apache/ignite-3/pull/1263#discussion_r1007884863


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -284,19 +288,32 @@ private void 
handleSafeTimeSyncCommand(SafeTimeSyncCommand cmd) {
         // No-op.
     }
 
-    /** {@inheritDoc} */
     @Override
     public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
-        storage.flush();
+        // We need to take the max index here because otherwise there is a 
scenario on a node restart when a node will install
+        // snapshot instead of starting even if all data is presented. This 
might happen because a raft log is truncated up
+        // to the maximal index among storages after a snapshot is done in 
raft,
+        // and because we take minimal applied index among storages for the 
local recovery. In case of a cluster restart,
+        // it is possible that cluster even can't start, if we won't take max 
index.

Review Comment:
   This is a very convoluted description, maybe even wrong partially, please 
change it. I don't think that mentioning leader is necessary here. This code is 
required for local recovery only and a possible scenario of false node failure 
when we actually have all required data



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -108,19 +108,20 @@ public void onRead(Iterator<CommandClosure<ReadCommand>> 
iterator) {
         });
     }
 
-    /** {@inheritDoc} */
     @Override
     public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
         iterator.forEachRemaining((CommandClosure<? extends WriteCommand> clo) 
-> {
             Command command = clo.command();
 
             long commandIndex = clo.index();
 
-            long storageAppliedIndex = storage.lastAppliedIndex();
+            // We choose the minimum applied index, since we choose it (the 
minimum one) on local recovery so as not to lose the data for
+            // one of the storages.
+            long storagesAppliedIndex = Math.min(storage.lastAppliedIndex(), 
txStateStorage.lastAppliedIndex());
 
-            assert storageAppliedIndex < commandIndex
-                    : "Pending write command has a higher index than already 
processed commands [commandIndex=" + commandIndex
-                    + ", storageAppliedIndex=" + storageAppliedIndex + ']';
+            assert commandIndex > storagesAppliedIndex :

Review Comment:
   I would add both indexes to the assertion message



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -147,6 +148,7 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>> 
iterator) {
      * Handler for the {@link UpdateCommand}.
      *
      * @param cmd Command.
+     * @param commandIndex Index of the RAFT command.
      */
     private void handleUpdateCommand(UpdateCommand cmd, long commandIndex) {
         storage.runConsistently(() -> {

Review Comment:
   I see no place where you skip already applied commands



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to