This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 887b6d8f6901deeaba6577a166f30a83d949f4a2 Author: seanyinx <sean....@huawei.com> AuthorDate: Wed Jan 3 17:47:05 2018 +0800 SCB-100 delegated executors to proxy Signed-off-by: seanyinx <sean....@huawei.com> --- .../spring/CompensableAnnotationProcessor.java | 5 + .../transaction/spring/ExecutorFieldCallback.java | 163 +++++++++++++++++++++ .../spring/annotations/OmegaContextAware.java | 30 ++++ .../spring/CompensableAnnotationCheckingTest.java | 15 ++ .../spring/MisconfiguredAnnotation.java | 29 ++++ .../spring/TransactionInterceptionTest.java | 57 ++++--- 6 files changed, 278 insertions(+), 21 deletions(-) diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java index 9cb5b27..87f9049 100644 --- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java +++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java @@ -33,6 +33,7 @@ class CompensableAnnotationProcessor implements BeanPostProcessor { @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { checkMethod(bean); + checkFields(bean); return bean; } @@ -44,4 +45,8 @@ class CompensableAnnotationProcessor implements BeanPostProcessor { private void checkMethod(Object bean) { ReflectionUtils.doWithMethods(bean.getClass(), new CompensableMethodCheckingCallback(bean, omegaContext)); } + + private void checkFields(Object bean) { + ReflectionUtils.doWithFields(bean.getClass(), new ExecutorFieldCallback(bean, omegaContext)); + } } diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java new file mode 100644 index 0000000..3030fca --- /dev/null +++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.omega.transaction.spring; + +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +import org.apache.servicecomb.saga.omega.context.OmegaContext; +import org.apache.servicecomb.saga.omega.transaction.spring.annotations.OmegaContextAware; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.ReflectionUtils; +import org.springframework.util.ReflectionUtils.FieldCallback; + +class ExecutorFieldCallback implements FieldCallback { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final OmegaContext omegaContext; + private final Object bean; + + ExecutorFieldCallback(Object bean, OmegaContext omegaContext) { + this.omegaContext = omegaContext; + this.bean = bean; + } + + @Override + public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException { + if (!field.isAnnotationPresent(OmegaContextAware.class)) { + return; + } + + ReflectionUtils.makeAccessible(field); + + Class<?> generic = field.getType(); + + if (!Executor.class.isAssignableFrom(generic)) { + throw new IllegalArgumentException( + "Only Executor, ExecutorService, and ScheduledExecutorService are supported for @" + + OmegaContextAware.class.getSimpleName()); + } + + field.set(bean, ExecutorProxy.newInstance(field.get(bean), field.getType(), omegaContext)); + } + + private static class RunnableProxy implements InvocationHandler { + + private final String globalTxId; + private final String localTxId; + private final String parentTxId; + private final Object runnable; + private final OmegaContext omegaContext; + + private static Object newInstance(Object runnable, OmegaContext omegaContext) { + RunnableProxy runnableProxy = new RunnableProxy(omegaContext, runnable); + return Proxy.newProxyInstance( + runnable.getClass().getClassLoader(), + runnable.getClass().getInterfaces(), + runnableProxy); + } + + private RunnableProxy(OmegaContext omegaContext, Object runnable) { + this.omegaContext = omegaContext; + this.globalTxId = omegaContext.globalTxId(); + this.localTxId = omegaContext.localTxId(); + this.parentTxId = omegaContext.parentTxId(); + this.runnable = runnable; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + try { + LOG.debug("Setting OmegaContext with globalTxId [{}], localTxId [{}], & parentTxId [{}]", + globalTxId, + localTxId, + parentTxId); + + omegaContext.setGlobalTxId(globalTxId); + omegaContext.setLocalTxId(localTxId); + omegaContext.setParentTxId(parentTxId); + + return method.invoke(runnable, args); + } finally { + omegaContext.clear(); + LOG.debug("Cleared OmegaContext with globalTxId [{}], localTxId [{}], & parentTxId [{}]", + globalTxId, + localTxId, + parentTxId); + } + } + } + + private static class ExecutorProxy implements InvocationHandler { + private final Object target; + private final OmegaContext omegaContext; + + private ExecutorProxy(Object target, OmegaContext omegaContext) { + this.target = target; + this.omegaContext = omegaContext; + } + + private static Object newInstance(Object target, Class<?> targetClass, OmegaContext omegaContext) { + Class<?>[] interfaces = targetClass.isInterface() ? new Class<?>[] {targetClass} : targetClass.getInterfaces(); + + return Proxy.newProxyInstance( + targetClass.getClassLoader(), + interfaces, + new ExecutorProxy(target, omegaContext)); + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + Object[] augmentedArgs = new Object[args.length]; + + for (int i = 0; i < args.length; i++) { + Object arg = args[i]; + if (isExecutable(arg)) { + augmentedArgs[i] = RunnableProxy.newInstance(arg, omegaContext); + } else if (isCollectionOfExecutables(arg)) { + augmentedArgs[i] = ((Collection<?>) arg) + .stream() + .map(a -> RunnableProxy.newInstance(a, omegaContext)) + .collect(Collectors.toList()); + } else { + augmentedArgs[i] = arg; + } + } + + return method.invoke(target, augmentedArgs); + } + + private boolean isExecutable(Object arg) { + return arg instanceof Runnable || arg instanceof Callable; + } + + private boolean isCollectionOfExecutables(Object arg) { + return arg instanceof Collection + && !((Collection<?>) arg).isEmpty() + && isExecutable(((Collection<?>) arg).iterator().next()); + } + } +} diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/annotations/OmegaContextAware.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/annotations/OmegaContextAware.java new file mode 100644 index 0000000..5a4e7e4 --- /dev/null +++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/annotations/OmegaContextAware.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.omega.transaction.spring.annotations; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +@Retention(RUNTIME) +@Target({FIELD, METHOD}) +public @interface OmegaContextAware { +} diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationCheckingTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationCheckingTest.java index 3b399b9..d2abfc9 100644 --- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationCheckingTest.java +++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationCheckingTest.java @@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.omega.transaction.spring; import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing; import static org.hamcrest.CoreMatchers.startsWith; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; import org.junit.Test; @@ -39,4 +40,18 @@ public class CompensableAnnotationCheckingTest { assertThat(e.getCause().getMessage(), startsWith("No such compensation method [none]")); } } + + @Test + public void blowsUpWhenAnnotationOnWrongType() throws Exception { + try { + try (ConfigurableApplicationContext ignored = new SpringApplicationBuilder(TransactionTestMain.class) + .profiles("omega-context-aware-checking") + .run()) { + expectFailing(BeanCreationException.class); + } + } catch (BeanCreationException e) { + assertThat(e.getCause().getMessage(), + is("Only Executor, ExecutorService, and ScheduledExecutorService are supported for @OmegaContextAware")); + } + } } diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MisconfiguredAnnotation.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MisconfiguredAnnotation.java new file mode 100644 index 0000000..99459a5 --- /dev/null +++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MisconfiguredAnnotation.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.omega.transaction.spring; + +import org.apache.servicecomb.saga.omega.transaction.spring.annotations.OmegaContextAware; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; + +@Configuration +@Profile("omega-context-aware-checking") +class MisconfiguredAnnotation { + @OmegaContextAware + private final User user = new User(); +} diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java index 6de3d18..a2b7641 100644 --- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java +++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java @@ -20,8 +20,8 @@ package org.apache.servicecomb.saga.omega.transaction.spring; import static akka.actor.ActorRef.noSender; import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing; import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; +import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.servicecomb.saga.omega.transaction.spring.TransactionalUserService.ILLEGAL_USER; import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.nullValue; @@ -44,6 +44,7 @@ import org.apache.servicecomb.saga.omega.transaction.TxCompensatedEvent; import org.apache.servicecomb.saga.omega.transaction.TxEndedEvent; import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent; import org.apache.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.MessageConfig; +import org.apache.servicecomb.saga.omega.transaction.spring.annotations.OmegaContextAware; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -67,13 +68,21 @@ import akka.actor.Props; @SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class}) @AutoConfigureMockMvc public class TransactionInterceptionTest { - private static final ExecutorService executor = Executors.newSingleThreadExecutor(); private static final String globalTxId = UUID.randomUUID().toString(); private final String localTxId = UUID.randomUUID().toString(); private final String parentTxId = UUID.randomUUID().toString(); private final String username = uniquify("username"); private final String email = uniquify("email"); + private final User user = new User(username, email); + private final User illegalUser = new User(ILLEGAL_USER, email); + + private final String usernameJack = uniquify("Jack"); + private final User jack = new User(usernameJack, uniquify("j...@gmail.com")); + + @OmegaContextAware + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + @Autowired private List<String> messages; @@ -108,12 +117,11 @@ public class TransactionInterceptionTest { @AfterClass public static void afterClass() throws Exception { - executor.shutdown(); } @Test public void sendsUserToRemote_AroundTransaction() throws Exception { - User user = userService.add(new User(username, email)); + User user = userService.add(this.user); assertArrayEquals( new String[]{ @@ -129,9 +137,8 @@ public class TransactionInterceptionTest { @Test public void sendsAbortEvent_OnSubTransactionFailure() throws Exception { Throwable throwable = null; - User user = new User(ILLEGAL_USER, email); try { - userService.add(user); + userService.add(illegalUser); expectFailing(IllegalArgumentException.class); } catch (IllegalArgumentException ignored) { throwable = ignored; @@ -139,7 +146,7 @@ public class TransactionInterceptionTest { assertArrayEquals( new String[]{ - new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(), + new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, illegalUser).toString(), new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, throwable).toString()}, toArray(messages) ); @@ -147,11 +154,11 @@ public class TransactionInterceptionTest { @Test public void compensateOnTransactionException() throws Exception { - User user = userService.add(new User(username, email)); + User user = userService.add(this.user); // another sub transaction to the same service within the same global transaction String localTxId = omegaContext.newLocalTxId(); - User anotherUser = userService.add(new User(uniquify("Jack"), uniquify("j...@gmail.com"))); + User anotherUser = userService.add(jack); messageHandler.onReceive(globalTxId, this.localTxId, parentTxId, compensationMethod, user); messageHandler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser); @@ -174,38 +181,44 @@ public class TransactionInterceptionTest { @Test public void passesOmegaContextThroughDifferentThreads() throws Exception { - User user = new User(username, email); new Thread(() -> userService.add(user)).start(); + waitTillSavedUser(username); - await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null); + String newLocalTxId = omegaContext.newLocalTxId(); + new Thread(() -> userService.add(jack)).start(); + waitTillSavedUser(usernameJack); assertArrayEquals( new String[]{ new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(), - new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()}, + new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(), + new TxStartedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod, jack).toString(), + new TxEndedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod).toString()}, toArray(messages) ); } @Test public void passesOmegaContextInThreadPool() throws Exception { - User user = new User(username, email); executor.execute(() -> userService.add(user)); + waitTillSavedUser(username); - await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null); + String newLocalTxId = omegaContext.newLocalTxId(); + executor.invokeAll(singletonList(() -> userService.add(jack))); + waitTillSavedUser(usernameJack); assertArrayEquals( new String[]{ new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(), - new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()}, + new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(), + new TxStartedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod, jack).toString(), + new TxEndedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod).toString()}, toArray(messages) ); } @Test public void passesOmegaContextThroughReactiveX() throws Exception { - User user = new User(username, email); - Flowable.just(user) .parallel() .runOn(Schedulers.io()) @@ -213,7 +226,7 @@ public class TransactionInterceptionTest { .sequential() .subscribe(); - await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null); + waitTillSavedUser(username); assertArrayEquals( new String[]{ @@ -228,12 +241,10 @@ public class TransactionInterceptionTest { // TODO: 2018/1/3 if actor system starts before omega context initialized, this test will fail ActorSystem actorSystem = ActorSystem.create(); - User user = new User(username, email); - ActorRef actorRef = actorSystem.actorOf(UserServiceActor.props(userService)); actorRef.tell(user, noSender()); - await().atMost(1, SECONDS).until(() -> userRepository.findByUsername(username) != null); + waitTillSavedUser(username); assertArrayEquals( new String[] { @@ -245,6 +256,10 @@ public class TransactionInterceptionTest { actorSystem.terminate(); } + private void waitTillSavedUser(String username) { + await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null); + } + private static class UserServiceActor extends AbstractLoggingActor { private final TransactionalUserService userService; -- To stop receiving notification emails like this one, please contact "commits@servicecomb.apache.org" <commits@servicecomb.apache.org>.