This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit e2e63f3cafe25a4ce4da26fe2a8e8dd6d94bc5da Author: seanyinx <sean....@huawei.com> AuthorDate: Wed Jan 3 11:25:45 2018 +0800 SCB-100 async support for omega context Signed-off-by: seanyinx <sean....@huawei.com> --- .../saga/omega/context/OmegaContext.java | 6 +- omega/omega-spring-tx/pom.xml | 4 ++ .../spring/TransactionInterceptionTest.java | 71 ++++++++++++++++------ .../omega/transaction/spring/UserRepository.java | 1 + 4 files changed, 61 insertions(+), 21 deletions(-) diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java index 6016b53..f336c4c 100644 --- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java +++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java @@ -31,9 +31,9 @@ public class OmegaContext { public static final String GLOBAL_TX_ID_KEY = "X-Pack-Global-Transaction-Id"; public static final String LOCAL_TX_ID_KEY = "X-Pack-Local-Transaction-Id"; - private final ThreadLocal<String> globalTxId = new ThreadLocal<>(); - private final ThreadLocal<String> localTxId = new ThreadLocal<>(); - private final ThreadLocal<String> parentTxId = new ThreadLocal<>(); + private final ThreadLocal<String> globalTxId = new InheritableThreadLocal<>(); + private final ThreadLocal<String> localTxId = new InheritableThreadLocal<>(); + private final ThreadLocal<String> parentTxId = new InheritableThreadLocal<>(); private final IdGenerator<String> idGenerator; private final Map<String, CompensationContext> compensationContexts = new HashMap<>(); diff --git a/omega/omega-spring-tx/pom.xml b/omega/omega-spring-tx/pom.xml index e5fc8b3..21661bb 100644 --- a/omega/omega-spring-tx/pom.xml +++ b/omega/omega-spring-tx/pom.xml @@ -72,6 +72,10 @@ <artifactId>spring-boot-starter-data-jpa</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + </dependency> </dependencies> diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java index dc23612..9ace53f 100644 --- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java +++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java @@ -19,7 +19,9 @@ package org.apache.servicecomb.saga.omega.transaction.spring; import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing; import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.servicecomb.saga.omega.transaction.spring.TransactionalUserService.ILLEGAL_USER; +import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertArrayEquals; @@ -28,8 +30,20 @@ import static org.junit.Assert.assertThat; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.servicecomb.saga.omega.context.OmegaContext; +import org.apache.servicecomb.saga.omega.context.UniqueIdGenerator; +import org.apache.servicecomb.saga.omega.transaction.MessageHandler; +import org.apache.servicecomb.saga.omega.transaction.MessageSender; +import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent; +import org.apache.servicecomb.saga.omega.transaction.TxCompensatedEvent; +import org.apache.servicecomb.saga.omega.transaction.TxEndedEvent; +import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent; +import org.apache.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.MessageConfig; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -40,22 +54,11 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.test.context.junit4.SpringRunner; -import org.apache.servicecomb.saga.omega.context.OmegaContext; -import org.apache.servicecomb.saga.omega.context.UniqueIdGenerator; -import org.apache.servicecomb.saga.omega.transaction.MessageHandler; -import org.apache.servicecomb.saga.omega.transaction.MessageSender; -import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent; -import org.apache.servicecomb.saga.omega.transaction.TxCompensatedEvent; -import org.apache.servicecomb.saga.omega.transaction.TxEndedEvent; -import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent; -import org.apache.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.MessageConfig; - @RunWith(SpringRunner.class) @SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class}) @AutoConfigureMockMvc public class TransactionInterceptionTest { - private static final String TX_STARTED_EVENT = "TxStartedEvent"; - private static final String TX_ENDED_EVENT = "TxEndedEvent"; + private static final ExecutorService executor = Executors.newSingleThreadExecutor(); private static final String globalTxId = UUID.randomUUID().toString(); private final String localTxId = UUID.randomUUID().toString(); private final String parentTxId = UUID.randomUUID().toString(); @@ -77,11 +80,14 @@ public class TransactionInterceptionTest { @Autowired private MessageHandler messageHandler; + private String compensationMethod; + @Before public void setUp() throws Exception { omegaContext.setGlobalTxId(globalTxId); omegaContext.setLocalTxId(localTxId); omegaContext.setParentTxId(parentTxId); + compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString(); } @After @@ -89,12 +95,15 @@ public class TransactionInterceptionTest { messages.clear(); } + @AfterClass + public static void afterClass() throws Exception { + executor.shutdown(); + } + @Test public void sendsUserToRemote_AroundTransaction() throws Exception { User user = userService.add(new User(username, email)); - String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString(); - assertArrayEquals( new String[]{ new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(), @@ -117,8 +126,6 @@ public class TransactionInterceptionTest { throwable = ignored; } - String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString(); - assertArrayEquals( new String[]{ new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(), @@ -135,8 +142,6 @@ public class TransactionInterceptionTest { String localTxId = omegaContext.newLocalTxId(); User anotherUser = userService.add(new User(uniquify("Jack"), uniquify("j...@gmail.com"))); - String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString(); - messageHandler.onReceive(globalTxId, this.localTxId, parentTxId, compensationMethod, user); messageHandler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser); @@ -156,6 +161,36 @@ public class TransactionInterceptionTest { ); } + @Test + public void passesOmegaContextThroughDifferentThreads() throws Exception { + User user = new User(username, email); + new Thread(() -> userService.add(user)).start(); + + await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null); + + assertArrayEquals( + new String[]{ + new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(), + new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()}, + toArray(messages) + ); + } + + @Test + public void passesOmegaContextInThreadPool() throws Exception { + User user = new User(username, email); + executor.execute(() -> userService.add(user)); + + await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null); + + assertArrayEquals( + new String[]{ + new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(), + new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()}, + toArray(messages) + ); + } + private String[] toArray(List<String> messages) { return messages.toArray(new String[messages.size()]); } diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/UserRepository.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/UserRepository.java index bcf3e14..729b7ab 100644 --- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/UserRepository.java +++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/UserRepository.java @@ -20,4 +20,5 @@ package org.apache.servicecomb.saga.omega.transaction.spring; import org.springframework.data.repository.CrudRepository; interface UserRepository extends CrudRepository<User, Long> { + User findByUsername(String username); } -- To stop receiving notification emails like this one, please contact "commits@servicecomb.apache.org" <commits@servicecomb.apache.org>.