This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 887b6d8f6901deeaba6577a166f30a83d949f4a2
Author: seanyinx <sean....@huawei.com>
AuthorDate: Wed Jan 3 17:47:05 2018 +0800

    SCB-100 delegated executors to proxy
    
    Signed-off-by: seanyinx <sean....@huawei.com>
---
 .../spring/CompensableAnnotationProcessor.java     |   5 +
 .../transaction/spring/ExecutorFieldCallback.java  | 163 +++++++++++++++++++++
 .../spring/annotations/OmegaContextAware.java      |  30 ++++
 .../spring/CompensableAnnotationCheckingTest.java  |  15 ++
 .../spring/MisconfiguredAnnotation.java            |  29 ++++
 .../spring/TransactionInterceptionTest.java        |  57 ++++---
 6 files changed, 278 insertions(+), 21 deletions(-)

diff --git 
a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
 
b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
index 9cb5b27..87f9049 100644
--- 
a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
+++ 
b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
@@ -33,6 +33,7 @@ class CompensableAnnotationProcessor implements 
BeanPostProcessor {
   @Override
   public Object postProcessBeforeInitialization(Object bean, String beanName) 
throws BeansException {
     checkMethod(bean);
+    checkFields(bean);
     return bean;
   }
 
@@ -44,4 +45,8 @@ class CompensableAnnotationProcessor implements 
BeanPostProcessor {
   private void checkMethod(Object bean) {
     ReflectionUtils.doWithMethods(bean.getClass(), new 
CompensableMethodCheckingCallback(bean, omegaContext));
   }
+
+  private void checkFields(Object bean) {
+    ReflectionUtils.doWithFields(bean.getClass(), new 
ExecutorFieldCallback(bean, omegaContext));
+  }
 }
diff --git 
a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
 
b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
new file mode 100644
index 0000000..3030fca
--- /dev/null
+++ 
b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.spring;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import 
org.apache.servicecomb.saga.omega.transaction.spring.annotations.OmegaContextAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.ReflectionUtils;
+import org.springframework.util.ReflectionUtils.FieldCallback;
+
+class ExecutorFieldCallback implements FieldCallback {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final OmegaContext omegaContext;
+  private final Object bean;
+
+  ExecutorFieldCallback(Object bean, OmegaContext omegaContext) {
+    this.omegaContext = omegaContext;
+    this.bean = bean;
+  }
+
+  @Override
+  public void doWith(Field field) throws IllegalArgumentException, 
IllegalAccessException {
+    if (!field.isAnnotationPresent(OmegaContextAware.class)) {
+      return;
+    }
+
+    ReflectionUtils.makeAccessible(field);
+
+    Class<?> generic = field.getType();
+
+    if (!Executor.class.isAssignableFrom(generic)) {
+      throw new IllegalArgumentException(
+          "Only Executor, ExecutorService, and ScheduledExecutorService are 
supported for @"
+              + OmegaContextAware.class.getSimpleName());
+    }
+
+    field.set(bean, ExecutorProxy.newInstance(field.get(bean), 
field.getType(), omegaContext));
+  }
+
+  private static class RunnableProxy implements InvocationHandler {
+
+    private final String globalTxId;
+    private final String localTxId;
+    private final String parentTxId;
+    private final Object runnable;
+    private final OmegaContext omegaContext;
+
+    private static Object newInstance(Object runnable, OmegaContext 
omegaContext) {
+      RunnableProxy runnableProxy = new RunnableProxy(omegaContext, runnable);
+      return Proxy.newProxyInstance(
+          runnable.getClass().getClassLoader(),
+          runnable.getClass().getInterfaces(),
+          runnableProxy);
+    }
+
+    private RunnableProxy(OmegaContext omegaContext, Object runnable) {
+      this.omegaContext = omegaContext;
+      this.globalTxId = omegaContext.globalTxId();
+      this.localTxId = omegaContext.localTxId();
+      this.parentTxId = omegaContext.parentTxId();
+      this.runnable = runnable;
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+      try {
+        LOG.debug("Setting OmegaContext with globalTxId [{}], localTxId [{}], 
& parentTxId [{}]",
+            globalTxId,
+            localTxId,
+            parentTxId);
+
+        omegaContext.setGlobalTxId(globalTxId);
+        omegaContext.setLocalTxId(localTxId);
+        omegaContext.setParentTxId(parentTxId);
+
+        return method.invoke(runnable, args);
+      } finally {
+        omegaContext.clear();
+        LOG.debug("Cleared OmegaContext with globalTxId [{}], localTxId [{}], 
& parentTxId [{}]",
+            globalTxId,
+            localTxId,
+            parentTxId);
+      }
+    }
+  }
+
+  private static class ExecutorProxy implements InvocationHandler {
+    private final Object target;
+    private final OmegaContext omegaContext;
+
+    private ExecutorProxy(Object target, OmegaContext omegaContext) {
+      this.target = target;
+      this.omegaContext = omegaContext;
+    }
+
+    private static Object newInstance(Object target, Class<?> targetClass, 
OmegaContext omegaContext) {
+      Class<?>[] interfaces = targetClass.isInterface() ? new Class<?>[] 
{targetClass} : targetClass.getInterfaces();
+
+      return Proxy.newProxyInstance(
+          targetClass.getClassLoader(),
+          interfaces,
+          new ExecutorProxy(target, omegaContext));
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+      Object[] augmentedArgs = new Object[args.length];
+
+      for (int i = 0; i < args.length; i++) {
+        Object arg = args[i];
+        if (isExecutable(arg)) {
+          augmentedArgs[i] = RunnableProxy.newInstance(arg, omegaContext);
+        } else if (isCollectionOfExecutables(arg)) {
+          augmentedArgs[i] = ((Collection<?>) arg)
+              .stream()
+              .map(a -> RunnableProxy.newInstance(a, omegaContext))
+              .collect(Collectors.toList());
+        } else {
+          augmentedArgs[i] = arg;
+        }
+      }
+
+      return method.invoke(target, augmentedArgs);
+    }
+
+    private boolean isExecutable(Object arg) {
+      return arg instanceof Runnable || arg instanceof Callable;
+    }
+
+    private boolean isCollectionOfExecutables(Object arg) {
+      return arg instanceof Collection
+          && !((Collection<?>) arg).isEmpty()
+          && isExecutable(((Collection<?>) arg).iterator().next());
+    }
+  }
+}
diff --git 
a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/annotations/OmegaContextAware.java
 
b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/annotations/OmegaContextAware.java
new file mode 100644
index 0000000..5a4e7e4
--- /dev/null
+++ 
b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/annotations/OmegaContextAware.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.spring.annotations;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+@Retention(RUNTIME)
+@Target({FIELD, METHOD})
+public @interface OmegaContextAware {
+}
diff --git 
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationCheckingTest.java
 
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationCheckingTest.java
index 3b399b9..d2abfc9 100644
--- 
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationCheckingTest.java
+++ 
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationCheckingTest.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.omega.transaction.spring;
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static org.hamcrest.CoreMatchers.startsWith;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
 import org.junit.Test;
@@ -39,4 +40,18 @@ public class CompensableAnnotationCheckingTest {
       assertThat(e.getCause().getMessage(), startsWith("No such compensation 
method [none]"));
     }
   }
+
+  @Test
+  public void blowsUpWhenAnnotationOnWrongType() throws Exception {
+    try {
+      try (ConfigurableApplicationContext ignored = new 
SpringApplicationBuilder(TransactionTestMain.class)
+          .profiles("omega-context-aware-checking")
+          .run()) {
+        expectFailing(BeanCreationException.class);
+      }
+    } catch (BeanCreationException e) {
+      assertThat(e.getCause().getMessage(),
+          is("Only Executor, ExecutorService, and ScheduledExecutorService are 
supported for @OmegaContextAware"));
+    }
+  }
 }
diff --git 
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MisconfiguredAnnotation.java
 
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MisconfiguredAnnotation.java
new file mode 100644
index 0000000..99459a5
--- /dev/null
+++ 
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MisconfiguredAnnotation.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.spring;
+
+import 
org.apache.servicecomb.saga.omega.transaction.spring.annotations.OmegaContextAware;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+
+@Configuration
+@Profile("omega-context-aware-checking")
+class MisconfiguredAnnotation {
+  @OmegaContextAware
+  private final User user = new User();
+}
diff --git 
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
 
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 6de3d18..a2b7641 100644
--- 
a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ 
b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -20,8 +20,8 @@ package org.apache.servicecomb.saga.omega.transaction.spring;
 import static akka.actor.ActorRef.noSender;
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.util.Collections.singletonList;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.servicecomb.saga.omega.transaction.spring.TransactionalUserService.ILLEGAL_USER;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.CoreMatchers.nullValue;
@@ -44,6 +44,7 @@ import 
org.apache.servicecomb.saga.omega.transaction.TxCompensatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.TxEndedEvent;
 import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
 import 
org.apache.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.MessageConfig;
+import 
org.apache.servicecomb.saga.omega.transaction.spring.annotations.OmegaContextAware;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -67,13 +68,21 @@ import akka.actor.Props;
 @SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class})
 @AutoConfigureMockMvc
 public class TransactionInterceptionTest {
-  private static final ExecutorService executor = 
Executors.newSingleThreadExecutor();
   private static final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
   private final String parentTxId = UUID.randomUUID().toString();
   private final String username = uniquify("username");
   private final String email = uniquify("email");
 
+  private final User user = new User(username, email);
+  private final User illegalUser = new User(ILLEGAL_USER, email);
+
+  private final String usernameJack = uniquify("Jack");
+  private final User jack = new User(usernameJack, uniquify("j...@gmail.com"));
+
+  @OmegaContextAware
+  private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
   @Autowired
   private List<String> messages;
 
@@ -108,12 +117,11 @@ public class TransactionInterceptionTest {
 
   @AfterClass
   public static void afterClass() throws Exception {
-    executor.shutdown();
   }
 
   @Test
   public void sendsUserToRemote_AroundTransaction() throws Exception {
-    User user = userService.add(new User(username, email));
+    User user = userService.add(this.user);
 
     assertArrayEquals(
         new String[]{
@@ -129,9 +137,8 @@ public class TransactionInterceptionTest {
   @Test
   public void sendsAbortEvent_OnSubTransactionFailure() throws Exception {
     Throwable throwable = null;
-    User user = new User(ILLEGAL_USER, email);
     try {
-      userService.add(user);
+      userService.add(illegalUser);
       expectFailing(IllegalArgumentException.class);
     } catch (IllegalArgumentException ignored) {
       throwable = ignored;
@@ -139,7 +146,7 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, localTxId, parentTxId, 
compensationMethod, user).toString(),
+            new TxStartedEvent(globalTxId, localTxId, parentTxId, 
compensationMethod, illegalUser).toString(),
             new TxAbortedEvent(globalTxId, localTxId, parentTxId, 
compensationMethod, throwable).toString()},
         toArray(messages)
     );
@@ -147,11 +154,11 @@ public class TransactionInterceptionTest {
 
   @Test
   public void compensateOnTransactionException() throws Exception {
-    User user = userService.add(new User(username, email));
+    User user = userService.add(this.user);
 
     // another sub transaction to the same service within the same global 
transaction
     String localTxId = omegaContext.newLocalTxId();
-    User anotherUser = userService.add(new User(uniquify("Jack"), 
uniquify("j...@gmail.com")));
+    User anotherUser = userService.add(jack);
 
     messageHandler.onReceive(globalTxId, this.localTxId, parentTxId, 
compensationMethod, user);
     messageHandler.onReceive(globalTxId, localTxId, parentTxId, 
compensationMethod, anotherUser);
@@ -174,38 +181,44 @@ public class TransactionInterceptionTest {
 
   @Test
   public void passesOmegaContextThroughDifferentThreads() throws Exception {
-    User user = new User(username, email);
     new Thread(() -> userService.add(user)).start();
+    waitTillSavedUser(username);
 
-    await().atMost(500, MILLISECONDS).until(() -> 
userRepository.findByUsername(username) != null);
+    String newLocalTxId = omegaContext.newLocalTxId();
+    new Thread(() -> userService.add(jack)).start();
+    waitTillSavedUser(usernameJack);
 
     assertArrayEquals(
         new String[]{
             new TxStartedEvent(globalTxId, localTxId, parentTxId, 
compensationMethod, user).toString(),
-            new TxEndedEvent(globalTxId, localTxId, parentTxId, 
compensationMethod).toString()},
+            new TxEndedEvent(globalTxId, localTxId, parentTxId, 
compensationMethod).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, parentTxId, 
compensationMethod, jack).toString(),
+            new TxEndedEvent(globalTxId, newLocalTxId, parentTxId, 
compensationMethod).toString()},
         toArray(messages)
     );
   }
 
   @Test
   public void passesOmegaContextInThreadPool() throws Exception {
-    User user = new User(username, email);
     executor.execute(() -> userService.add(user));
+    waitTillSavedUser(username);
 
-    await().atMost(500, MILLISECONDS).until(() -> 
userRepository.findByUsername(username) != null);
+    String newLocalTxId = omegaContext.newLocalTxId();
+    executor.invokeAll(singletonList(() -> userService.add(jack)));
+    waitTillSavedUser(usernameJack);
 
     assertArrayEquals(
         new String[]{
             new TxStartedEvent(globalTxId, localTxId, parentTxId, 
compensationMethod, user).toString(),
-            new TxEndedEvent(globalTxId, localTxId, parentTxId, 
compensationMethod).toString()},
+            new TxEndedEvent(globalTxId, localTxId, parentTxId, 
compensationMethod).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, parentTxId, 
compensationMethod, jack).toString(),
+            new TxEndedEvent(globalTxId, newLocalTxId, parentTxId, 
compensationMethod).toString()},
         toArray(messages)
     );
   }
 
   @Test
   public void passesOmegaContextThroughReactiveX() throws Exception {
-    User user = new User(username, email);
-
     Flowable.just(user)
         .parallel()
         .runOn(Schedulers.io())
@@ -213,7 +226,7 @@ public class TransactionInterceptionTest {
         .sequential()
         .subscribe();
 
-    await().atMost(500, MILLISECONDS).until(() -> 
userRepository.findByUsername(username) != null);
+    waitTillSavedUser(username);
 
     assertArrayEquals(
         new String[]{
@@ -228,12 +241,10 @@ public class TransactionInterceptionTest {
     // TODO: 2018/1/3 if actor system starts before omega context initialized, 
this test will fail
     ActorSystem actorSystem = ActorSystem.create();
 
-    User user = new User(username, email);
-
     ActorRef actorRef = 
actorSystem.actorOf(UserServiceActor.props(userService));
     actorRef.tell(user, noSender());
 
-    await().atMost(1, SECONDS).until(() -> 
userRepository.findByUsername(username) != null);
+    waitTillSavedUser(username);
 
     assertArrayEquals(
         new String[] {
@@ -245,6 +256,10 @@ public class TransactionInterceptionTest {
     actorSystem.terminate();
   }
 
+  private void waitTillSavedUser(String username) {
+    await().atMost(500, MILLISECONDS).until(() -> 
userRepository.findByUsername(username) != null);
+  }
+
   private static class UserServiceActor extends AbstractLoggingActor {
     private final TransactionalUserService userService;
 

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <commits@servicecomb.apache.org>.

Reply via email to