This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch SCB-547
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit d338daa4e229bd1be9429a75076db15ecf966304
Author: Willem Jiang <jiangni...@huawei.com>
AuthorDate: Mon May 7 17:36:49 2018 +0800

    SCB-547 try to fix the EventScanner multiple AbortedGlobalTx issue
---
 .../apache/servicecomb/saga/alpha/core/EventScanner.java    | 13 ++++++++++---
 .../servicecomb/saga/alpha/core/TxEventRepository.java      |  2 +-
 .../saga/alpha/core/TxConsistentServiceTest.java            |  2 +-
 .../saga/alpha/server/SpringTxEventRepository.java          |  2 +-
 .../saga/alpha/server/TxEventEnvelopeRepository.java        |  2 +-
 5 files changed, 14 insertions(+), 7 deletions(-)

diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index 3168870..8082ab3 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -25,6 +25,7 @@ import static 
org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
 import java.lang.invoke.MethodHandles;
+import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.slf4j.Logger;
@@ -141,20 +142,26 @@ public class EventScanner implements Runnable {
   }
 
   private void updateTransactionStatus() {
-    
eventRepository.findFirstAbortedGlobalTransaction().ifPresent(this::markGlobalTxEnd);
+    
eventRepository.findFirstAbortedGlobalTransaction().ifPresent(this::markGlobalTxEndWithEvents);
   }
 
   private void markSagaEnded(TxEvent event) {
     if 
(commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
-      markGlobalTxEnd(event);
+      markGlobalTxEntWithEvent(event);
     }
   }
 
-  private void markGlobalTxEnd(TxEvent event) {
+  private void markGlobalTxEntWithEvent(TxEvent event) {
     eventRepository.save(toSagaEndedEvent(event));
     LOG.info("Marked end of transaction with globalTxId {}", 
event.globalTxId());
   }
 
+  private void markGlobalTxEndWithEvents(List<TxEvent> events) {
+    events.forEach(event -> {
+      markGlobalTxEntWithEvent(event);
+    });
+  }
+
   private TxEvent toTxAbortedEvent(TxTimeout timeout) {
     return new TxEvent(
         timeout.serviceName(),
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index c481226..f2cccca 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -42,7 +42,7 @@ public interface TxEventRepository {
    * </ol>
    * @return
    */
-  Optional<TxEvent> findFirstAbortedGlobalTransaction();
+  Optional<List<TxEvent>> findFirstAbortedGlobalTransaction();
 
   /**
    * Find timeout {@link TxEvent}s. A timeout TxEvent satisfies below 
requirements:
diff --git 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 8faf0e8..b0c19c8 100644
--- 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -49,7 +49,7 @@ public class TxConsistentServiceTest {
     }
 
     @Override
-    public Optional<TxEvent> findFirstAbortedGlobalTransaction() {
+    public Optional<List<TxEvent>> findFirstAbortedGlobalTransaction() {
       return Optional.empty();
     }
 
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index e48a780..951e533 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -40,7 +40,7 @@ class SpringTxEventRepository implements TxEventRepository {
   }
 
   @Override
-  public Optional<TxEvent> findFirstAbortedGlobalTransaction() {
+  public Optional<List<TxEvent>> findFirstAbortedGlobalTransaction() {
     return eventRepo.findFirstAbortedGlobalTxByType();
   }
 
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 6a8a263..fdf920f 100644
--- 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -46,7 +46,7 @@ interface TxEventEnvelopeRepository extends 
CrudRepository<TxEvent, Long> {
       + "  AND t2.localTxId = t.localTxId "
       + "  AND t2.type = 'TxStartedEvent') = 0 "
       + "OR t.globalTxId = t.localTxId)")
-  Optional<TxEvent> findFirstAbortedGlobalTxByType();
+  Optional<List<TxEvent>> findFirstAbortedGlobalTxByType();
 
   @Query("SELECT t FROM TxEvent t "
       + "WHERE t.type IN ('TxStartedEvent', 'SagaStartedEvent') "

-- 
To stop receiving notification emails like this one, please contact
ningji...@apache.org.

Reply via email to