liuqiufeng commented on code in PR #6499: URL: https://github.com/apache/incubator-seata/pull/6499#discussion_r1580727523
########## server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java: ########## @@ -488,6 +488,116 @@ private boolean isRetryTimeout(long now, long timeout, long beginTime) { return timeout >= ALWAYS_RETRY_BOUNDARY && now - beginTime > timeout; } + /** + * Handle rollbacking by scheduled. + */ + protected void handleRollbackingByScheduled() { + long delay = ROLLBACKING_RETRY_PERIOD; + SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses); + sessionCondition.setLazyLoadBranch(true); + List<GlobalSession> rollbackingSessions = + SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition); + if (CollectionUtils.isEmpty(rollbackingSessions)) { + rollbackingSchedule(delay); + return; + } + rollbackingSessions.sort(Comparator.comparingLong(GlobalSession::getBeginTime)); + List<GlobalSession> needDoRollbackingSessions = new ArrayList<>(); + for (GlobalSession rollbackingSession : rollbackingSessions) { + long time = rollbackingSession.timeToDeadSession(); + if (time <= 0) { + needDoRollbackingSessions.add(rollbackingSession); + } else { + delay = Math.max(time, ROLLBACKING_RETRY_PERIOD); + break; + } + } + long now = System.currentTimeMillis(); + SessionHelper.forEach(needDoRollbackingSessions, rollbackingSession -> { + try { + if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT, rollbackingSession.getBeginTime())) { + if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) { + rollbackingSession.clean(); + } + + SessionHelper.endRollbackFailed(rollbackingSession, true, true); + + //The function of this 'return' is 'continue'. + return; + } + core.doGlobalRollback(rollbackingSession, true); + } catch (TransactionException ex) { + LOGGER.error("Failed to handle rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage()); + } + }); + rollbackingSchedule(delay); + } + + private void rollbackingSchedule(long delay) { + syncProcessing.schedule( + () -> { + boolean called = SessionHolder.distributedLockAndExecute(ROLLBACKING, this::handleRollbackingByScheduled); + if (!called) { + rollbackingSchedule(ROLLBACKING_RETRY_PERIOD); + } + }, + delay, TimeUnit.MILLISECONDS); + } + + /** + * Handle committing by scheduled. + */ + protected void handleCommittingByScheduled() { + long delay = COMMITTING_RETRY_PERIOD; + SessionCondition sessionCondition = new SessionCondition(committingStatuses); + sessionCondition.setLazyLoadBranch(true); + List<GlobalSession> committingSessions = + SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition); + if (CollectionUtils.isEmpty(committingSessions)) { + committingSchedule(delay); + return; + } + committingSessions.sort(Comparator.comparingLong(GlobalSession::getBeginTime)); + List<GlobalSession> needDoCommittingSessions = new ArrayList<>(); + for (GlobalSession committingSession : committingSessions) { + long time = committingSession.timeToDeadSession(); + if (time <= 0) { + needDoCommittingSessions.add(committingSession); + } else { + delay = Math.max(time, COMMITTING_RETRY_PERIOD); Review Comment: > 这里应该记录delay,做break即可,然后foreach后的时间直接用这个时间,否则可能会出现多个定时任务在同时运行 The delay should be recorded here, just do break, and then use this time directly after foreach, otherwise multiple scheduled tasks may run at the same time done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org