funky-eyes commented on code in PR #7133: URL: https://github.com/apache/incubator-seata/pull/7133#discussion_r1957481470
########## server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java: ########## @@ -631,6 +644,64 @@ private void committingSchedule(long delay) { delay, TimeUnit.MILLISECONDS); } + /** + * Handle rollbacked by scheduled. + */ + protected void handleEndStatesByScheduled() { + SessionCondition sessionCondition = new SessionCondition(endStatuses); + sessionCondition.setLazyLoadBranch(true); + List<GlobalSession> rollbackedSessions = + SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition); + if (CollectionUtils.isEmpty(rollbackedSessions)) { + rollbackedSchedule(RETRY_DEAD_THRESHOLD); + return; + } + long delay = END_STATUS_RETRY_PERIOD; + rollbackedSessions.sort(Comparator.comparingLong(GlobalSession::getBeginTime)); + List<GlobalSession> needDoRollbackedSessions = new ArrayList<>(); + for (GlobalSession rollbackedSession : rollbackedSessions) { + long time = rollbackedSession.timeToDeadSession(); + if (time <= 0) { + needDoRollbackedSessions.add(rollbackedSession); + } else { + delay = Math.max(time, END_STATUS_RETRY_PERIOD); + break; + } + } + long now = System.currentTimeMillis(); + SessionHelper.forEach(needDoRollbackedSessions, rollbackedSession -> { + try { + if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT, rollbackedSession.getBeginTime())) { + handleEndStateSession(rollbackedSession); + } + } catch (TransactionException ex) { + LOGGER.error("Failed to handle rollbacked [{}] {} {}", rollbackedSession.getXid(), ex.getCode(), ex.getMessage()); + } + }); + rollbackedSchedule(delay); Review Comment: ```suggestion endSchedule(delay); ``` ########## server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java: ########## @@ -631,6 +644,64 @@ private void committingSchedule(long delay) { delay, TimeUnit.MILLISECONDS); } + /** + * Handle rollbacked by scheduled. + */ + protected void handleEndStatesByScheduled() { + SessionCondition sessionCondition = new SessionCondition(endStatuses); + sessionCondition.setLazyLoadBranch(true); + List<GlobalSession> rollbackedSessions = + SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition); + if (CollectionUtils.isEmpty(rollbackedSessions)) { + rollbackedSchedule(RETRY_DEAD_THRESHOLD); + return; + } + long delay = END_STATUS_RETRY_PERIOD; + rollbackedSessions.sort(Comparator.comparingLong(GlobalSession::getBeginTime)); + List<GlobalSession> needDoRollbackedSessions = new ArrayList<>(); + for (GlobalSession rollbackedSession : rollbackedSessions) { + long time = rollbackedSession.timeToDeadSession(); + if (time <= 0) { + needDoRollbackedSessions.add(rollbackedSession); + } else { + delay = Math.max(time, END_STATUS_RETRY_PERIOD); + break; + } + } + long now = System.currentTimeMillis(); + SessionHelper.forEach(needDoRollbackedSessions, rollbackedSession -> { + try { + if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT, rollbackedSession.getBeginTime())) { + handleEndStateSession(rollbackedSession); + } + } catch (TransactionException ex) { + LOGGER.error("Failed to handle rollbacked [{}] {} {}", rollbackedSession.getXid(), ex.getCode(), ex.getMessage()); + } + }); + rollbackedSchedule(delay); + } + + private void handleEndStateSession(GlobalSession globalSession) throws TransactionException { + GlobalStatus status = globalSession.getStatus(); + + if (status == GlobalStatus.CommitFailed) { Review Comment: The commit state is not limited to just one, and the rollback state is the same. I suggest putting the handling of various end states directly in the SessionHelper, and then calling it from handleEndStateSession. The only input parameter needed is a globalsession ########## server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java: ########## @@ -631,6 +644,64 @@ private void committingSchedule(long delay) { delay, TimeUnit.MILLISECONDS); } + /** + * Handle rollbacked by scheduled. + */ + protected void handleEndStatesByScheduled() { + SessionCondition sessionCondition = new SessionCondition(endStatuses); + sessionCondition.setLazyLoadBranch(true); + List<GlobalSession> rollbackedSessions = + SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition); + if (CollectionUtils.isEmpty(rollbackedSessions)) { + rollbackedSchedule(RETRY_DEAD_THRESHOLD); + return; + } + long delay = END_STATUS_RETRY_PERIOD; + rollbackedSessions.sort(Comparator.comparingLong(GlobalSession::getBeginTime)); + List<GlobalSession> needDoRollbackedSessions = new ArrayList<>(); + for (GlobalSession rollbackedSession : rollbackedSessions) { + long time = rollbackedSession.timeToDeadSession(); + if (time <= 0) { + needDoRollbackedSessions.add(rollbackedSession); + } else { + delay = Math.max(time, END_STATUS_RETRY_PERIOD); + break; + } + } + long now = System.currentTimeMillis(); + SessionHelper.forEach(needDoRollbackedSessions, rollbackedSession -> { + try { + if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT, rollbackedSession.getBeginTime())) { + handleEndStateSession(rollbackedSession); + } + } catch (TransactionException ex) { + LOGGER.error("Failed to handle rollbacked [{}] {} {}", rollbackedSession.getXid(), ex.getCode(), ex.getMessage()); + } + }); + rollbackedSchedule(delay); + } + + private void handleEndStateSession(GlobalSession globalSession) throws TransactionException { + GlobalStatus status = globalSession.getStatus(); + + if (status == GlobalStatus.CommitFailed) { + SessionHelper.endCommitted(globalSession, true); + return; + } + SessionHelper.endRollbacked(globalSession, true); + } + + private void rollbackedSchedule(long delay) { Review Comment: ```suggestion private void endSchedule(long delay) { ``` ########## server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java: ########## @@ -631,6 +644,64 @@ private void committingSchedule(long delay) { delay, TimeUnit.MILLISECONDS); } + /** + * Handle rollbacked by scheduled. + */ + protected void handleEndStatesByScheduled() { + SessionCondition sessionCondition = new SessionCondition(endStatuses); + sessionCondition.setLazyLoadBranch(true); + List<GlobalSession> rollbackedSessions = + SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition); + if (CollectionUtils.isEmpty(rollbackedSessions)) { + rollbackedSchedule(RETRY_DEAD_THRESHOLD); + return; + } + long delay = END_STATUS_RETRY_PERIOD; + rollbackedSessions.sort(Comparator.comparingLong(GlobalSession::getBeginTime)); + List<GlobalSession> needDoRollbackedSessions = new ArrayList<>(); + for (GlobalSession rollbackedSession : rollbackedSessions) { + long time = rollbackedSession.timeToDeadSession(); + if (time <= 0) { + needDoRollbackedSessions.add(rollbackedSession); + } else { + delay = Math.max(time, END_STATUS_RETRY_PERIOD); + break; + } + } + long now = System.currentTimeMillis(); + SessionHelper.forEach(needDoRollbackedSessions, rollbackedSession -> { + try { + if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT, rollbackedSession.getBeginTime())) { + handleEndStateSession(rollbackedSession); + } + } catch (TransactionException ex) { + LOGGER.error("Failed to handle rollbacked [{}] {} {}", rollbackedSession.getXid(), ex.getCode(), ex.getMessage()); + } + }); + rollbackedSchedule(delay); + } + + private void handleEndStateSession(GlobalSession globalSession) throws TransactionException { + GlobalStatus status = globalSession.getStatus(); + + if (status == GlobalStatus.CommitFailed) { + SessionHelper.endCommitted(globalSession, true); + return; + } + SessionHelper.endRollbacked(globalSession, true); + } + + private void rollbackedSchedule(long delay) { + syncProcessing.schedule( + () -> { + boolean called = SessionHolder.distributedLockAndExecute(ROLLBACKED, this::handleEndStatesByScheduled); Review Comment: ```suggestion boolean called = SessionHolder.distributedLockAndExecute(END, this::handleEndStatesByScheduled); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org