funky-eyes commented on code in PR #6499: URL: https://github.com/apache/incubator-seata/pull/6499#discussion_r1580648480
########## server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java: ########## @@ -488,6 +488,116 @@ private boolean isRetryTimeout(long now, long timeout, long beginTime) { return timeout >= ALWAYS_RETRY_BOUNDARY && now - beginTime > timeout; } + /** + * Handle rollbacking by scheduled. + */ + protected void handleRollbackingByScheduled() { + long delay = ROLLBACKING_RETRY_PERIOD; + SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses); + sessionCondition.setLazyLoadBranch(true); + List<GlobalSession> rollbackingSessions = + SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition); + if (CollectionUtils.isEmpty(rollbackingSessions)) { + rollbackingSchedule(delay); + return; + } + rollbackingSessions.sort(Comparator.comparingLong(GlobalSession::getBeginTime)); + List<GlobalSession> needDoRollbackingSessions = new ArrayList<>(); + for (GlobalSession rollbackingSession : rollbackingSessions) { + long time = rollbackingSession.timeToDeadSession(); + if (time <= 0) { + needDoRollbackingSessions.add(rollbackingSession); + } else { + delay = Math.max(time, ROLLBACKING_RETRY_PERIOD); + break; + } + } + long now = System.currentTimeMillis(); + SessionHelper.forEach(needDoRollbackingSessions, rollbackingSession -> { + try { + if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT, rollbackingSession.getBeginTime())) { + if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) { + rollbackingSession.clean(); + } + + SessionHelper.endRollbackFailed(rollbackingSession, true, true); + + //The function of this 'return' is 'continue'. + return; + } + core.doGlobalRollback(rollbackingSession, true); + } catch (TransactionException ex) { + LOGGER.error("Failed to handle rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage()); + } + }); + rollbackingSchedule(delay); + } + + private void rollbackingSchedule(long delay) { + syncProcessing.schedule( + () -> { + boolean called = SessionHolder.distributedLockAndExecute(ROLLBACKING, this::handleRollbackingByScheduled); + if (!called) { + rollbackingSchedule(ROLLBACKING_RETRY_PERIOD); + } + }, + delay, TimeUnit.MILLISECONDS); + } + + /** + * Handle committing by scheduled. + */ + protected void handleCommittingByScheduled() { + long delay = COMMITTING_RETRY_PERIOD; + SessionCondition sessionCondition = new SessionCondition(committingStatuses); + sessionCondition.setLazyLoadBranch(true); + List<GlobalSession> committingSessions = + SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition); + if (CollectionUtils.isEmpty(committingSessions)) { + committingSchedule(delay); Review Comment: If empty, delay should be RETRY_DEAD_THRESHOLD value ########## server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java: ########## @@ -488,6 +488,116 @@ private boolean isRetryTimeout(long now, long timeout, long beginTime) { return timeout >= ALWAYS_RETRY_BOUNDARY && now - beginTime > timeout; } + /** + * Handle rollbacking by scheduled. + */ + protected void handleRollbackingByScheduled() { + long delay = ROLLBACKING_RETRY_PERIOD; + SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses); + sessionCondition.setLazyLoadBranch(true); + List<GlobalSession> rollbackingSessions = + SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition); + if (CollectionUtils.isEmpty(rollbackingSessions)) { + rollbackingSchedule(delay); + return; + } + rollbackingSessions.sort(Comparator.comparingLong(GlobalSession::getBeginTime)); + List<GlobalSession> needDoRollbackingSessions = new ArrayList<>(); + for (GlobalSession rollbackingSession : rollbackingSessions) { + long time = rollbackingSession.timeToDeadSession(); + if (time <= 0) { + needDoRollbackingSessions.add(rollbackingSession); + } else { + delay = Math.max(time, ROLLBACKING_RETRY_PERIOD); Review Comment: 这里应该记录delay,做break即可,然后foreach后的时间直接用这个时间,否则可能会出现多个定时任务在同时运行 The delay should be recorded here, just do break, and then use this time directly after foreach, otherwise multiple scheduled tasks may run at the same time ########## server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java: ########## @@ -488,6 +488,116 @@ private boolean isRetryTimeout(long now, long timeout, long beginTime) { return timeout >= ALWAYS_RETRY_BOUNDARY && now - beginTime > timeout; } + /** + * Handle rollbacking by scheduled. + */ + protected void handleRollbackingByScheduled() { + long delay = ROLLBACKING_RETRY_PERIOD; + SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses); + sessionCondition.setLazyLoadBranch(true); + List<GlobalSession> rollbackingSessions = + SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition); + if (CollectionUtils.isEmpty(rollbackingSessions)) { + rollbackingSchedule(delay); Review Comment: 如果为空,delay应该是RETRY_DEAD_THRESHOLD的值 If empty, delay should be RETRY_DEAD_THRESHOLD value ########## server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java: ########## @@ -488,6 +488,116 @@ private boolean isRetryTimeout(long now, long timeout, long beginTime) { return timeout >= ALWAYS_RETRY_BOUNDARY && now - beginTime > timeout; } + /** + * Handle rollbacking by scheduled. + */ + protected void handleRollbackingByScheduled() { + long delay = ROLLBACKING_RETRY_PERIOD; + SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses); + sessionCondition.setLazyLoadBranch(true); + List<GlobalSession> rollbackingSessions = + SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition); + if (CollectionUtils.isEmpty(rollbackingSessions)) { + rollbackingSchedule(delay); + return; + } + rollbackingSessions.sort(Comparator.comparingLong(GlobalSession::getBeginTime)); + List<GlobalSession> needDoRollbackingSessions = new ArrayList<>(); + for (GlobalSession rollbackingSession : rollbackingSessions) { + long time = rollbackingSession.timeToDeadSession(); + if (time <= 0) { + needDoRollbackingSessions.add(rollbackingSession); + } else { + delay = Math.max(time, ROLLBACKING_RETRY_PERIOD); + break; + } + } + long now = System.currentTimeMillis(); + SessionHelper.forEach(needDoRollbackingSessions, rollbackingSession -> { + try { + if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT, rollbackingSession.getBeginTime())) { + if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) { + rollbackingSession.clean(); + } + + SessionHelper.endRollbackFailed(rollbackingSession, true, true); + + //The function of this 'return' is 'continue'. + return; + } + core.doGlobalRollback(rollbackingSession, true); + } catch (TransactionException ex) { + LOGGER.error("Failed to handle rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage()); + } + }); + rollbackingSchedule(delay); + } + + private void rollbackingSchedule(long delay) { + syncProcessing.schedule( + () -> { + boolean called = SessionHolder.distributedLockAndExecute(ROLLBACKING, this::handleRollbackingByScheduled); + if (!called) { + rollbackingSchedule(ROLLBACKING_RETRY_PERIOD); + } + }, + delay, TimeUnit.MILLISECONDS); + } + + /** + * Handle committing by scheduled. + */ + protected void handleCommittingByScheduled() { + long delay = COMMITTING_RETRY_PERIOD; + SessionCondition sessionCondition = new SessionCondition(committingStatuses); + sessionCondition.setLazyLoadBranch(true); + List<GlobalSession> committingSessions = + SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition); + if (CollectionUtils.isEmpty(committingSessions)) { + committingSchedule(delay); + return; + } + committingSessions.sort(Comparator.comparingLong(GlobalSession::getBeginTime)); + List<GlobalSession> needDoCommittingSessions = new ArrayList<>(); + for (GlobalSession committingSession : committingSessions) { + long time = committingSession.timeToDeadSession(); + if (time <= 0) { + needDoCommittingSessions.add(committingSession); + } else { + delay = Math.max(time, COMMITTING_RETRY_PERIOD); Review Comment: 这里应该记录delay,做break即可,然后foreach后的时间直接用这个时间,否则可能会出现多个定时任务在同时运行 The delay should be recorded here, just do break, and then use this time directly after foreach, otherwise multiple scheduled tasks may run at the same time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org