This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch SCB-547 in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit d338daa4e229bd1be9429a75076db15ecf966304 Author: Willem Jiang <jiangni...@huawei.com> AuthorDate: Mon May 7 17:36:49 2018 +0800 SCB-547 try to fix the EventScanner multiple AbortedGlobalTx issue --- .../apache/servicecomb/saga/alpha/core/EventScanner.java | 13 ++++++++++--- .../servicecomb/saga/alpha/core/TxEventRepository.java | 2 +- .../saga/alpha/core/TxConsistentServiceTest.java | 2 +- .../saga/alpha/server/SpringTxEventRepository.java | 2 +- .../saga/alpha/server/TxEventEnvelopeRepository.java | 2 +- 5 files changed, 14 insertions(+), 7 deletions(-) diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java index 3168870..8082ab3 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java @@ -25,6 +25,7 @@ import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; import java.lang.invoke.MethodHandles; +import java.util.List; import java.util.concurrent.ScheduledExecutorService; import org.slf4j.Logger; @@ -141,20 +142,26 @@ public class EventScanner implements Runnable { } private void updateTransactionStatus() { - eventRepository.findFirstAbortedGlobalTransaction().ifPresent(this::markGlobalTxEnd); + eventRepository.findFirstAbortedGlobalTransaction().ifPresent(this::markGlobalTxEndWithEvents); } private void markSagaEnded(TxEvent event) { if (commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) { - markGlobalTxEnd(event); + markGlobalTxEntWithEvent(event); } } - private void markGlobalTxEnd(TxEvent event) { + private void markGlobalTxEntWithEvent(TxEvent event) { eventRepository.save(toSagaEndedEvent(event)); LOG.info("Marked end of transaction with globalTxId {}", event.globalTxId()); } + private void markGlobalTxEndWithEvents(List<TxEvent> events) { + events.forEach(event -> { + markGlobalTxEntWithEvent(event); + }); + } + private TxEvent toTxAbortedEvent(TxTimeout timeout) { return new TxEvent( timeout.serviceName(), diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java index c481226..f2cccca 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java @@ -42,7 +42,7 @@ public interface TxEventRepository { * </ol> * @return */ - Optional<TxEvent> findFirstAbortedGlobalTransaction(); + Optional<List<TxEvent>> findFirstAbortedGlobalTransaction(); /** * Find timeout {@link TxEvent}s. A timeout TxEvent satisfies below requirements: diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java index 8faf0e8..b0c19c8 100644 --- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java +++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java @@ -49,7 +49,7 @@ public class TxConsistentServiceTest { } @Override - public Optional<TxEvent> findFirstAbortedGlobalTransaction() { + public Optional<List<TxEvent>> findFirstAbortedGlobalTransaction() { return Optional.empty(); } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java index e48a780..951e533 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java @@ -40,7 +40,7 @@ class SpringTxEventRepository implements TxEventRepository { } @Override - public Optional<TxEvent> findFirstAbortedGlobalTransaction() { + public Optional<List<TxEvent>> findFirstAbortedGlobalTransaction() { return eventRepo.findFirstAbortedGlobalTxByType(); } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java index 6a8a263..fdf920f 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java @@ -46,7 +46,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> { + " AND t2.localTxId = t.localTxId " + " AND t2.type = 'TxStartedEvent') = 0 " + "OR t.globalTxId = t.localTxId)") - Optional<TxEvent> findFirstAbortedGlobalTxByType(); + Optional<List<TxEvent>> findFirstAbortedGlobalTxByType(); @Query("SELECT t FROM TxEvent t " + "WHERE t.type IN ('TxStartedEvent', 'SagaStartedEvent') " -- To stop receiving notification emails like this one, please contact ningji...@apache.org.