This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 0aaa3fde7da0b56ab109140576179cff1a70d9ea Author: seanyinx <sean....@huawei.com> AuthorDate: Wed Jan 10 21:07:30 2018 +0800 SCB-212 tx timeout impl Signed-off-by: seanyinx <sean....@huawei.com> --- omega/omega-transaction/pom.xml | 4 ++ .../omega/transaction/CompensableInterceptor.java | 19 +++++--- .../omega/transaction/EventAwareInterceptor.java | 20 ++++++-- .../omega/transaction/TimeAwareInterceptor.java | 54 ++++++++++++++++++++++ .../saga/omega/transaction/TransactionAspect.java | 46 ++++++------------ .../transaction/CompensableInterceptorTest.java | 21 +++++++-- .../transaction/TimeAwareInterceptorTest.java} | 8 ++-- .../omega/transaction/TransactionAspectTest.java | 44 ++++++++++++++++-- 8 files changed, 163 insertions(+), 53 deletions(-) diff --git a/omega/omega-transaction/pom.xml b/omega/omega-transaction/pom.xml index 90c00e9..1829650 100644 --- a/omega/omega-transaction/pom.xml +++ b/omega/omega-transaction/pom.xml @@ -59,6 +59,10 @@ <groupId>com.github.seanyinx</groupId> <artifactId>unit-scaffolding</artifactId> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + </dependency> </dependencies> diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java index b443c4d..76193cd 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java @@ -17,26 +17,31 @@ package org.apache.servicecomb.saga.omega.transaction; +import org.apache.servicecomb.saga.omega.context.OmegaContext; + class CompensableInterceptor implements EventAwareInterceptor { + private final OmegaContext context; private final MessageSender sender; - CompensableInterceptor(MessageSender sender) { + CompensableInterceptor(OmegaContext context, MessageSender sender) { + this.context = context; this.sender = sender; } @Override - public void preIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... message) { - sender.send(new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, message)); + public void preIntercept(String parentTxId, String compensationMethod, Object... message) { + sender.send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, message)); } @Override - public void postIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod) { - sender.send(new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod)); + public void postIntercept(String parentTxId, String compensationMethod) { + sender.send(new TxEndedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod)); } @Override - public void onError(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) { - sender.send(new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, throwable)); + public void onError(String parentTxId, String compensationMethod, Throwable throwable) { + sender.send( + new TxAbortedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, throwable)); } } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java index 9be92e6..291538f 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java @@ -18,9 +18,23 @@ package org.apache.servicecomb.saga.omega.transaction; public interface EventAwareInterceptor { - void preIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... message); + EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() { + @Override + public void preIntercept(String parentTxId, String compensationMethod, Object... message) { + } - void postIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod); + @Override + public void postIntercept(String parentTxId, String compensationMethod) { + } - void onError(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable); + @Override + public void onError(String parentTxId, String compensationMethod, Throwable throwable) { + } + }; + + void preIntercept(String parentTxId, String compensationMethod, Object... message); + + void postIntercept(String parentTxId, String compensationMethod); + + void onError(String parentTxId, String compensationMethod, Throwable throwable); } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java new file mode 100644 index 0000000..d633630 --- /dev/null +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.omega.transaction; + +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeoutException; + +class TimeAwareInterceptor implements EventAwareInterceptor { + private final EventAwareInterceptor interceptor; + private final BlockingDeque<EventAwareInterceptor> interceptors = new LinkedBlockingDeque<>(2); + + TimeAwareInterceptor(EventAwareInterceptor interceptor) { + this.interceptor = interceptor; + this.interceptors.offer(interceptor); + } + + @Override + public void preIntercept(String localTxId, String signature, Object... args) { + interceptor.preIntercept(localTxId, signature, args); + } + + @Override + public void postIntercept(String localTxId, String signature) { + interceptors.offerLast(NO_OP_INTERCEPTOR); + interceptors.pollFirst().postIntercept(localTxId, signature); + } + + @Override + public void onError(String localTxId, String signature, Throwable throwable) { + interceptors.offerLast(NO_OP_INTERCEPTOR); + interceptors.pollFirst().onError(localTxId, signature, throwable); + } + + void onTimeout(String signature, String localTxId) { + interceptors.offerFirst(NO_OP_INTERCEPTOR); + interceptors.pollLast().onError(localTxId, signature, new TimeoutException()); + } +} diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java index a447489..9bee829 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java @@ -17,8 +17,12 @@ package org.apache.servicecomb.saga.omega.transaction; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + import java.lang.invoke.MethodHandles; import java.lang.reflect.Method; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.apache.servicecomb.saga.omega.context.OmegaContext; import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable; @@ -34,11 +38,12 @@ public class TransactionAspect { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final OmegaContext context; - private final EventAwareInterceptor interceptor; + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final CompensableInterceptor interceptor; public TransactionAspect(MessageSender sender, OmegaContext context) { this.context = context; - this.interceptor = new CompensableInterceptor(sender); + this.interceptor = new CompensableInterceptor(context, sender); } @Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)") @@ -49,17 +54,22 @@ public class TransactionAspect { String signature = compensationMethodSignature(joinPoint, compensable, method); String localTxId = context.localTxId(); + context.newLocalTxId(); - preIntercept(joinPoint, signature, localTxId); + TimeAwareInterceptor interceptor = new TimeAwareInterceptor(this.interceptor); + interceptor.preIntercept(localTxId, signature, joinPoint.getArgs()); LOG.debug("Updated context {} for compensable method {} ", context, method.toString()); + if (compensable.timeout() > 0) { + executor.schedule(() -> interceptor.onTimeout(signature, localTxId), compensable.timeout(), MILLISECONDS); + } try { Object result = joinPoint.proceed(); - postIntercept(signature, localTxId); + interceptor.postIntercept(localTxId, signature); return result; } catch (Throwable throwable) { - interceptException(signature, throwable, localTxId); + interceptor.onError(localTxId, signature, throwable); throw throwable; } finally { context.setLocalTxId(localTxId); @@ -75,30 +85,4 @@ public class TransactionAspect { .getDeclaredMethod(compensable.compensationMethod(), method.getParameterTypes()) .toString(); } - - private void preIntercept(ProceedingJoinPoint joinPoint, String signature, String parentTxId) { - interceptor.preIntercept( - context.globalTxId(), - context.newLocalTxId(), - parentTxId, - signature, - joinPoint.getArgs()); - } - - private void postIntercept(String signature, String parentTxId) { - interceptor.postIntercept( - context.globalTxId(), - context.localTxId(), - parentTxId, - signature); - } - - private void interceptException(String signature, Throwable throwable, String parentTxId) { - interceptor.onError( - context.globalTxId(), - context.localTxId(), - parentTxId, - signature, - throwable); - } } diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java index 7505a1f..609ea6d 100644 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java @@ -27,7 +27,11 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; +import org.apache.servicecomb.saga.omega.context.IdGenerator; +import org.apache.servicecomb.saga.omega.context.OmegaContext; +import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; public class CompensableInterceptorTest { @@ -41,11 +45,20 @@ public class CompensableInterceptorTest { private final String message = uniquify("message"); private final String compensationMethod = getClass().getCanonicalName(); - private final CompensableInterceptor interceptor = new CompensableInterceptor(sender); + @SuppressWarnings("unchecked") + private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class); + private final OmegaContext context = new OmegaContext(idGenerator); + private final CompensableInterceptor interceptor = new CompensableInterceptor(context, sender); + + @Before + public void setUp() throws Exception { + context.setGlobalTxId(globalTxId); + context.setLocalTxId(localTxId); + } @Test public void sendsTxStartedEventBefore() throws Exception { - interceptor.preIntercept(globalTxId, localTxId, parentTxId, compensationMethod, message); + interceptor.preIntercept(parentTxId, compensationMethod, message); TxEvent event = messages.get(0); @@ -59,7 +72,7 @@ public class CompensableInterceptorTest { @Test public void sendsTxEndedEventAfter() throws Exception { - interceptor.postIntercept(globalTxId, localTxId, parentTxId, compensationMethod); + interceptor.postIntercept(parentTxId, compensationMethod); TxEvent event = messages.get(0); @@ -73,7 +86,7 @@ public class CompensableInterceptorTest { @Test public void sendsTxAbortedEventOnError() throws Exception { - interceptor.onError(globalTxId, localTxId, parentTxId, compensationMethod, new RuntimeException("oops")); + interceptor.onError(parentTxId, compensationMethod, new RuntimeException("oops")); TxEvent event = messages.get(0); diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java similarity index 68% copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java copy to omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java index 9be92e6..fb2ee1d 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java @@ -17,10 +17,8 @@ package org.apache.servicecomb.saga.omega.transaction; -public interface EventAwareInterceptor { - void preIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... message); +import static org.junit.Assert.*; - void postIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod); +public class TimeAwareInterceptorTest { - void onError(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable); -} +} \ No newline at end of file diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java index 76a0e34..2ce34b8 100644 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java @@ -18,13 +18,18 @@ package org.apache.servicecomb.saga.omega.transaction; import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import org.apache.servicecomb.saga.omega.context.IdGenerator; import org.apache.servicecomb.saga.omega.context.OmegaContext; @@ -55,6 +60,7 @@ public class TransactionAspectTest { @Before public void setUp() throws Exception { + when(idGenerator.nextId()).thenReturn(newLocalTxId); when(joinPoint.getSignature()).thenReturn(methodSignature); when(joinPoint.getTarget()).thenReturn(this); @@ -67,8 +73,6 @@ public class TransactionAspectTest { @Test public void newLocalTxIdInCompensable() throws Throwable { - when(idGenerator.nextId()).thenReturn(newLocalTxId); - aspect.advise(joinPoint, compensable); TxEvent startedEvent = messages.get(0); @@ -91,7 +95,6 @@ public class TransactionAspectTest { @Test public void restoreContextOnCompensableError() throws Throwable { - when(idGenerator.nextId()).thenReturn(newLocalTxId); RuntimeException oops = new RuntimeException("oops"); when(joinPoint.proceed()).thenThrow(oops); @@ -114,6 +117,41 @@ public class TransactionAspectTest { assertThat(omegaContext.localTxId(), is(localTxId)); } + @Test + public void sendsAbortEventOnTimeout() throws Throwable { + CountDownLatch latch = new CountDownLatch(1); + when(compensable.timeout()).thenReturn(100); + when(joinPoint.proceed()).thenAnswer(invocationOnMock -> { + latch.await(); + assertThat(omegaContext.localTxId(), is(newLocalTxId)); + return null; + }); + + CompletableFuture.runAsync(() -> { + try { + aspect.advise(joinPoint, compensable); + } catch (Throwable throwable) { + fail(throwable.getMessage()); + } + }); + + await().atMost(1, SECONDS).until(() -> messages.size() == 2); + + TxEvent event = messages.get(1); + + assertThat(event.globalTxId(), is(globalTxId)); + assertThat(event.localTxId(), is(newLocalTxId)); + assertThat(event.parentTxId(), is(localTxId)); + assertThat(event.type(), is("TxAbortedEvent")); + + latch.countDown(); + + await().atMost(1, SECONDS).until(() -> localTxId.equals(omegaContext.localTxId())); + + // no redundant ended message received + assertThat(messages.size(), is(2)); + } + private String doNothing() { return "doNothing"; } -- To stop receiving notification emails like this one, please contact "commits@servicecomb.apache.org" <commits@servicecomb.apache.org>.