This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 0aaa3fde7da0b56ab109140576179cff1a70d9ea
Author: seanyinx <sean....@huawei.com>
AuthorDate: Wed Jan 10 21:07:30 2018 +0800

    SCB-212 tx timeout impl
    
    Signed-off-by: seanyinx <sean....@huawei.com>
---
 omega/omega-transaction/pom.xml                    |  4 ++
 .../omega/transaction/CompensableInterceptor.java  | 19 +++++---
 .../omega/transaction/EventAwareInterceptor.java   | 20 ++++++--
 .../omega/transaction/TimeAwareInterceptor.java    | 54 ++++++++++++++++++++++
 .../saga/omega/transaction/TransactionAspect.java  | 46 ++++++------------
 .../transaction/CompensableInterceptorTest.java    | 21 +++++++--
 .../transaction/TimeAwareInterceptorTest.java}     |  8 ++--
 .../omega/transaction/TransactionAspectTest.java   | 44 ++++++++++++++++--
 8 files changed, 163 insertions(+), 53 deletions(-)

diff --git a/omega/omega-transaction/pom.xml b/omega/omega-transaction/pom.xml
index 90c00e9..1829650 100644
--- a/omega/omega-transaction/pom.xml
+++ b/omega/omega-transaction/pom.xml
@@ -59,6 +59,10 @@
       <groupId>com.github.seanyinx</groupId>
       <artifactId>unit-scaffolding</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+    </dependency>
 
   </dependencies>
 
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index b443c4d..76193cd 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -17,26 +17,31 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+
 class CompensableInterceptor implements EventAwareInterceptor {
+  private final OmegaContext context;
   private final MessageSender sender;
 
-  CompensableInterceptor(MessageSender sender) {
+  CompensableInterceptor(OmegaContext context, MessageSender sender) {
+    this.context = context;
     this.sender = sender;
   }
 
   @Override
-  public void preIntercept(String globalTxId, String localTxId, String 
parentTxId, String compensationMethod, Object... message) {
-    sender.send(new TxStartedEvent(globalTxId, localTxId, parentTxId, 
compensationMethod, message));
+  public void preIntercept(String parentTxId, String compensationMethod, 
Object... message) {
+    sender.send(new TxStartedEvent(context.globalTxId(), context.localTxId(), 
parentTxId, compensationMethod, message));
   }
 
   @Override
-  public void postIntercept(String globalTxId, String localTxId, String 
parentTxId, String compensationMethod) {
-    sender.send(new TxEndedEvent(globalTxId, localTxId, parentTxId, 
compensationMethod));
+  public void postIntercept(String parentTxId, String compensationMethod) {
+    sender.send(new TxEndedEvent(context.globalTxId(), context.localTxId(), 
parentTxId, compensationMethod));
 
   }
 
   @Override
-  public void onError(String globalTxId, String localTxId, String parentTxId, 
String compensationMethod, Throwable throwable) {
-    sender.send(new TxAbortedEvent(globalTxId, localTxId, parentTxId, 
compensationMethod, throwable));
+  public void onError(String parentTxId, String compensationMethod, Throwable 
throwable) {
+    sender.send(
+        new TxAbortedEvent(context.globalTxId(), context.localTxId(), 
parentTxId, compensationMethod, throwable));
   }
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
index 9be92e6..291538f 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
@@ -18,9 +18,23 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 public interface EventAwareInterceptor {
-  void preIntercept(String globalTxId, String localTxId, String parentTxId, 
String compensationMethod, Object... message);
+  EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() {
+    @Override
+    public void preIntercept(String parentTxId, String compensationMethod, 
Object... message) {
+    }
 
-  void postIntercept(String globalTxId, String localTxId, String parentTxId, 
String compensationMethod);
+    @Override
+    public void postIntercept(String parentTxId, String compensationMethod) {
+    }
 
-  void onError(String globalTxId, String localTxId, String parentTxId, String 
compensationMethod, Throwable throwable);
+    @Override
+    public void onError(String parentTxId, String compensationMethod, 
Throwable throwable) {
+    }
+  };
+
+  void preIntercept(String parentTxId, String compensationMethod, Object... 
message);
+
+  void postIntercept(String parentTxId, String compensationMethod);
+
+  void onError(String parentTxId, String compensationMethod, Throwable 
throwable);
 }
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
new file mode 100644
index 0000000..d633630
--- /dev/null
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeoutException;
+
+class TimeAwareInterceptor implements EventAwareInterceptor {
+  private final EventAwareInterceptor interceptor;
+  private final BlockingDeque<EventAwareInterceptor> interceptors = new 
LinkedBlockingDeque<>(2);
+
+  TimeAwareInterceptor(EventAwareInterceptor interceptor) {
+    this.interceptor = interceptor;
+    this.interceptors.offer(interceptor);
+  }
+
+  @Override
+  public void preIntercept(String localTxId, String signature, Object... args) 
{
+    interceptor.preIntercept(localTxId, signature, args);
+  }
+
+  @Override
+  public void postIntercept(String localTxId, String signature) {
+    interceptors.offerLast(NO_OP_INTERCEPTOR);
+    interceptors.pollFirst().postIntercept(localTxId, signature);
+  }
+
+  @Override
+  public void onError(String localTxId, String signature, Throwable throwable) 
{
+    interceptors.offerLast(NO_OP_INTERCEPTOR);
+    interceptors.pollFirst().onError(localTxId, signature, throwable);
+  }
+
+  void onTimeout(String signature, String localTxId) {
+    interceptors.offerFirst(NO_OP_INTERCEPTOR);
+    interceptors.pollLast().onError(localTxId, signature, new 
TimeoutException());
+  }
+}
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index a447489..9bee829 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ 
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -17,8 +17,12 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
@@ -34,11 +38,12 @@ public class TransactionAspect {
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final OmegaContext context;
-  private final EventAwareInterceptor interceptor;
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+  private final CompensableInterceptor interceptor;
 
   public TransactionAspect(MessageSender sender, OmegaContext context) {
     this.context = context;
-    this.interceptor = new CompensableInterceptor(sender);
+    this.interceptor = new CompensableInterceptor(context, sender);
   }
 
   
@Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable
 * *(..)) && @annotation(compensable)")
@@ -49,17 +54,22 @@ public class TransactionAspect {
     String signature = compensationMethodSignature(joinPoint, compensable, 
method);
 
     String localTxId = context.localTxId();
+    context.newLocalTxId();
 
-    preIntercept(joinPoint, signature, localTxId);
+    TimeAwareInterceptor interceptor = new 
TimeAwareInterceptor(this.interceptor);
+    interceptor.preIntercept(localTxId, signature, joinPoint.getArgs());
     LOG.debug("Updated context {} for compensable method {} ", context, 
method.toString());
+    if (compensable.timeout() > 0) {
+      executor.schedule(() -> interceptor.onTimeout(signature, localTxId), 
compensable.timeout(), MILLISECONDS);
+    }
 
     try {
       Object result = joinPoint.proceed();
-      postIntercept(signature, localTxId);
+      interceptor.postIntercept(localTxId, signature);
 
       return result;
     } catch (Throwable throwable) {
-      interceptException(signature, throwable, localTxId);
+      interceptor.onError(localTxId, signature, throwable);
       throw throwable;
     } finally {
       context.setLocalTxId(localTxId);
@@ -75,30 +85,4 @@ public class TransactionAspect {
         .getDeclaredMethod(compensable.compensationMethod(), 
method.getParameterTypes())
         .toString();
   }
-
-  private void preIntercept(ProceedingJoinPoint joinPoint, String signature, 
String parentTxId) {
-    interceptor.preIntercept(
-        context.globalTxId(),
-        context.newLocalTxId(),
-        parentTxId,
-        signature,
-        joinPoint.getArgs());
-  }
-
-  private void postIntercept(String signature, String parentTxId) {
-    interceptor.postIntercept(
-        context.globalTxId(),
-        context.localTxId(),
-        parentTxId,
-        signature);
-  }
-
-  private void interceptException(String signature, Throwable throwable, 
String parentTxId) {
-    interceptor.onError(
-        context.globalTxId(),
-        context.localTxId(),
-        parentTxId,
-        signature,
-        throwable);
-  }
 }
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
index 7505a1f..609ea6d 100644
--- 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
@@ -27,7 +27,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import org.apache.servicecomb.saga.omega.context.IdGenerator;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class CompensableInterceptorTest {
 
@@ -41,11 +45,20 @@ public class CompensableInterceptorTest {
   private final String message = uniquify("message");
   private final String compensationMethod = getClass().getCanonicalName();
 
-  private final CompensableInterceptor interceptor = new 
CompensableInterceptor(sender);
+  @SuppressWarnings("unchecked")
+  private final IdGenerator<String> idGenerator = 
Mockito.mock(IdGenerator.class);
+  private final OmegaContext context = new OmegaContext(idGenerator);
+  private final CompensableInterceptor interceptor = new 
CompensableInterceptor(context, sender);
+
+  @Before
+  public void setUp() throws Exception {
+    context.setGlobalTxId(globalTxId);
+    context.setLocalTxId(localTxId);
+  }
 
   @Test
   public void sendsTxStartedEventBefore() throws Exception {
-    interceptor.preIntercept(globalTxId, localTxId, parentTxId, 
compensationMethod, message);
+    interceptor.preIntercept(parentTxId, compensationMethod, message);
 
     TxEvent event = messages.get(0);
 
@@ -59,7 +72,7 @@ public class CompensableInterceptorTest {
 
   @Test
   public void sendsTxEndedEventAfter() throws Exception {
-    interceptor.postIntercept(globalTxId, localTxId, parentTxId, 
compensationMethod);
+    interceptor.postIntercept(parentTxId, compensationMethod);
 
     TxEvent event = messages.get(0);
 
@@ -73,7 +86,7 @@ public class CompensableInterceptorTest {
 
   @Test
   public void sendsTxAbortedEventOnError() throws Exception {
-    interceptor.onError(globalTxId, localTxId, parentTxId, compensationMethod, 
new RuntimeException("oops"));
+    interceptor.onError(parentTxId, compensationMethod, new 
RuntimeException("oops"));
 
     TxEvent event = messages.get(0);
 
diff --git 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
similarity index 68%
copy from 
omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
copy to 
omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
index 9be92e6..fb2ee1d 100644
--- 
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-public interface EventAwareInterceptor {
-  void preIntercept(String globalTxId, String localTxId, String parentTxId, 
String compensationMethod, Object... message);
+import static org.junit.Assert.*;
 
-  void postIntercept(String globalTxId, String localTxId, String parentTxId, 
String compensationMethod);
+public class TimeAwareInterceptorTest {
 
-  void onError(String globalTxId, String localTxId, String parentTxId, String 
compensationMethod, Throwable throwable);
-}
+}
\ No newline at end of file
diff --git 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index 76a0e34..2ce34b8 100644
--- 
a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ 
b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -18,13 +18,18 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -55,6 +60,7 @@ public class TransactionAspectTest {
 
   @Before
   public void setUp() throws Exception {
+    when(idGenerator.nextId()).thenReturn(newLocalTxId);
     when(joinPoint.getSignature()).thenReturn(methodSignature);
     when(joinPoint.getTarget()).thenReturn(this);
 
@@ -67,8 +73,6 @@ public class TransactionAspectTest {
 
   @Test
   public void newLocalTxIdInCompensable() throws Throwable {
-    when(idGenerator.nextId()).thenReturn(newLocalTxId);
-
     aspect.advise(joinPoint, compensable);
 
     TxEvent startedEvent = messages.get(0);
@@ -91,7 +95,6 @@ public class TransactionAspectTest {
 
   @Test
   public void restoreContextOnCompensableError() throws Throwable {
-    when(idGenerator.nextId()).thenReturn(newLocalTxId);
     RuntimeException oops = new RuntimeException("oops");
 
     when(joinPoint.proceed()).thenThrow(oops);
@@ -114,6 +117,41 @@ public class TransactionAspectTest {
     assertThat(omegaContext.localTxId(), is(localTxId));
   }
 
+  @Test
+  public void sendsAbortEventOnTimeout() throws Throwable {
+    CountDownLatch latch = new CountDownLatch(1);
+    when(compensable.timeout()).thenReturn(100);
+    when(joinPoint.proceed()).thenAnswer(invocationOnMock -> {
+      latch.await();
+      assertThat(omegaContext.localTxId(), is(newLocalTxId));
+      return null;
+    });
+
+    CompletableFuture.runAsync(() -> {
+      try {
+        aspect.advise(joinPoint, compensable);
+      } catch (Throwable throwable) {
+        fail(throwable.getMessage());
+      }
+    });
+
+    await().atMost(1, SECONDS).until(() -> messages.size() == 2);
+
+    TxEvent event = messages.get(1);
+
+    assertThat(event.globalTxId(), is(globalTxId));
+    assertThat(event.localTxId(), is(newLocalTxId));
+    assertThat(event.parentTxId(), is(localTxId));
+    assertThat(event.type(), is("TxAbortedEvent"));
+
+    latch.countDown();
+
+    await().atMost(1, SECONDS).until(() -> 
localTxId.equals(omegaContext.localTxId()));
+
+    // no redundant ended message received
+    assertThat(messages.size(), is(2));
+  }
+
   private String doNothing() {
     return "doNothing";
   }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <commits@servicecomb.apache.org>.

Reply via email to