funky-eyes commented on code in PR #7133:
URL: https://github.com/apache/incubator-seata/pull/7133#discussion_r1957481470


##########
server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java:
##########
@@ -631,6 +644,64 @@ private void committingSchedule(long delay) {
             delay, TimeUnit.MILLISECONDS);
     }
 
+    /**
+     * Handle rollbacked by scheduled.
+     */
+    protected void handleEndStatesByScheduled() {
+        SessionCondition sessionCondition = new SessionCondition(endStatuses);
+        sessionCondition.setLazyLoadBranch(true);
+        List<GlobalSession> rollbackedSessions =
+            
SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition);
+        if (CollectionUtils.isEmpty(rollbackedSessions)) {
+            rollbackedSchedule(RETRY_DEAD_THRESHOLD);
+            return;
+        }
+        long delay = END_STATUS_RETRY_PERIOD;
+        
rollbackedSessions.sort(Comparator.comparingLong(GlobalSession::getBeginTime));
+        List<GlobalSession> needDoRollbackedSessions = new ArrayList<>();
+        for (GlobalSession rollbackedSession : rollbackedSessions) {
+            long time = rollbackedSession.timeToDeadSession();
+            if (time <= 0) {
+                needDoRollbackedSessions.add(rollbackedSession);
+            } else {
+                delay = Math.max(time, END_STATUS_RETRY_PERIOD);
+                break;
+            }
+        }
+        long now = System.currentTimeMillis();
+        SessionHelper.forEach(needDoRollbackedSessions, rollbackedSession -> {
+            try {
+                if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT, 
rollbackedSession.getBeginTime())) {
+                    handleEndStateSession(rollbackedSession);
+                }
+            } catch (TransactionException ex) {
+                LOGGER.error("Failed to handle rollbacked [{}] {} {}", 
rollbackedSession.getXid(), ex.getCode(), ex.getMessage());
+            }
+        });
+        rollbackedSchedule(delay);

Review Comment:
   ```suggestion
           endSchedule(delay);
   ```



##########
server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java:
##########
@@ -631,6 +644,64 @@ private void committingSchedule(long delay) {
             delay, TimeUnit.MILLISECONDS);
     }
 
+    /**
+     * Handle rollbacked by scheduled.
+     */
+    protected void handleEndStatesByScheduled() {
+        SessionCondition sessionCondition = new SessionCondition(endStatuses);
+        sessionCondition.setLazyLoadBranch(true);
+        List<GlobalSession> rollbackedSessions =
+            
SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition);
+        if (CollectionUtils.isEmpty(rollbackedSessions)) {
+            rollbackedSchedule(RETRY_DEAD_THRESHOLD);
+            return;
+        }
+        long delay = END_STATUS_RETRY_PERIOD;
+        
rollbackedSessions.sort(Comparator.comparingLong(GlobalSession::getBeginTime));
+        List<GlobalSession> needDoRollbackedSessions = new ArrayList<>();
+        for (GlobalSession rollbackedSession : rollbackedSessions) {
+            long time = rollbackedSession.timeToDeadSession();
+            if (time <= 0) {
+                needDoRollbackedSessions.add(rollbackedSession);
+            } else {
+                delay = Math.max(time, END_STATUS_RETRY_PERIOD);
+                break;
+            }
+        }
+        long now = System.currentTimeMillis();
+        SessionHelper.forEach(needDoRollbackedSessions, rollbackedSession -> {
+            try {
+                if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT, 
rollbackedSession.getBeginTime())) {
+                    handleEndStateSession(rollbackedSession);
+                }
+            } catch (TransactionException ex) {
+                LOGGER.error("Failed to handle rollbacked [{}] {} {}", 
rollbackedSession.getXid(), ex.getCode(), ex.getMessage());
+            }
+        });
+        rollbackedSchedule(delay);
+    }
+
+    private void handleEndStateSession(GlobalSession globalSession) throws 
TransactionException {
+        GlobalStatus status = globalSession.getStatus();
+
+        if (status == GlobalStatus.CommitFailed) {

Review Comment:
   The commit state is not limited to just one, and the rollback state is the 
same. I suggest putting the handling of various end states directly in the 
SessionHelper, and then calling it from handleEndStateSession. The only input 
parameter needed is a globalsession



##########
server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java:
##########
@@ -631,6 +644,64 @@ private void committingSchedule(long delay) {
             delay, TimeUnit.MILLISECONDS);
     }
 
+    /**
+     * Handle rollbacked by scheduled.
+     */
+    protected void handleEndStatesByScheduled() {
+        SessionCondition sessionCondition = new SessionCondition(endStatuses);
+        sessionCondition.setLazyLoadBranch(true);
+        List<GlobalSession> rollbackedSessions =
+            
SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition);
+        if (CollectionUtils.isEmpty(rollbackedSessions)) {
+            rollbackedSchedule(RETRY_DEAD_THRESHOLD);
+            return;
+        }
+        long delay = END_STATUS_RETRY_PERIOD;
+        
rollbackedSessions.sort(Comparator.comparingLong(GlobalSession::getBeginTime));
+        List<GlobalSession> needDoRollbackedSessions = new ArrayList<>();
+        for (GlobalSession rollbackedSession : rollbackedSessions) {
+            long time = rollbackedSession.timeToDeadSession();
+            if (time <= 0) {
+                needDoRollbackedSessions.add(rollbackedSession);
+            } else {
+                delay = Math.max(time, END_STATUS_RETRY_PERIOD);
+                break;
+            }
+        }
+        long now = System.currentTimeMillis();
+        SessionHelper.forEach(needDoRollbackedSessions, rollbackedSession -> {
+            try {
+                if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT, 
rollbackedSession.getBeginTime())) {
+                    handleEndStateSession(rollbackedSession);
+                }
+            } catch (TransactionException ex) {
+                LOGGER.error("Failed to handle rollbacked [{}] {} {}", 
rollbackedSession.getXid(), ex.getCode(), ex.getMessage());
+            }
+        });
+        rollbackedSchedule(delay);
+    }
+
+    private void handleEndStateSession(GlobalSession globalSession) throws 
TransactionException {
+        GlobalStatus status = globalSession.getStatus();
+
+        if (status == GlobalStatus.CommitFailed) {
+            SessionHelper.endCommitted(globalSession, true);
+            return;
+        }
+        SessionHelper.endRollbacked(globalSession, true);
+    }
+
+    private void rollbackedSchedule(long delay) {

Review Comment:
   ```suggestion
       private void endSchedule(long delay) {
   ```



##########
server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java:
##########
@@ -631,6 +644,64 @@ private void committingSchedule(long delay) {
             delay, TimeUnit.MILLISECONDS);
     }
 
+    /**
+     * Handle rollbacked by scheduled.
+     */
+    protected void handleEndStatesByScheduled() {
+        SessionCondition sessionCondition = new SessionCondition(endStatuses);
+        sessionCondition.setLazyLoadBranch(true);
+        List<GlobalSession> rollbackedSessions =
+            
SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition);
+        if (CollectionUtils.isEmpty(rollbackedSessions)) {
+            rollbackedSchedule(RETRY_DEAD_THRESHOLD);
+            return;
+        }
+        long delay = END_STATUS_RETRY_PERIOD;
+        
rollbackedSessions.sort(Comparator.comparingLong(GlobalSession::getBeginTime));
+        List<GlobalSession> needDoRollbackedSessions = new ArrayList<>();
+        for (GlobalSession rollbackedSession : rollbackedSessions) {
+            long time = rollbackedSession.timeToDeadSession();
+            if (time <= 0) {
+                needDoRollbackedSessions.add(rollbackedSession);
+            } else {
+                delay = Math.max(time, END_STATUS_RETRY_PERIOD);
+                break;
+            }
+        }
+        long now = System.currentTimeMillis();
+        SessionHelper.forEach(needDoRollbackedSessions, rollbackedSession -> {
+            try {
+                if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT, 
rollbackedSession.getBeginTime())) {
+                    handleEndStateSession(rollbackedSession);
+                }
+            } catch (TransactionException ex) {
+                LOGGER.error("Failed to handle rollbacked [{}] {} {}", 
rollbackedSession.getXid(), ex.getCode(), ex.getMessage());
+            }
+        });
+        rollbackedSchedule(delay);
+    }
+
+    private void handleEndStateSession(GlobalSession globalSession) throws 
TransactionException {
+        GlobalStatus status = globalSession.getStatus();
+
+        if (status == GlobalStatus.CommitFailed) {
+            SessionHelper.endCommitted(globalSession, true);
+            return;
+        }
+        SessionHelper.endRollbacked(globalSession, true);
+    }
+
+    private void rollbackedSchedule(long delay) {
+        syncProcessing.schedule(
+            () -> {
+                boolean called = 
SessionHolder.distributedLockAndExecute(ROLLBACKED, 
this::handleEndStatesByScheduled);

Review Comment:
   ```suggestion
                   boolean called = 
SessionHolder.distributedLockAndExecute(END, this::handleEndStatesByScheduled);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org
For additional commands, e-mail: notifications-h...@seata.apache.org

Reply via email to