This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 0373fa796d12682e488939220125ab182e6d4fa7 Author: Lei Zhang <coolbee...@gmail.com> AuthorDate: Tue Sep 10 17:33:12 2019 +0800 SCB-1368 Remove persistent queues for reliability --- ...va => DefaultTransactionRepositoryChannel.java} | 40 ++--------- .../pack/alpha/fsm/sink/SagaActorEventSender.java | 82 ---------------------- .../servicecomb/pack/alpha/fsm/SagaActorTest.java | 6 +- .../pack/alpha/fsm/SagaIntegrationTest.java | 29 ++++---- 4 files changed, 20 insertions(+), 137 deletions(-) diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/channel/MemoryTransactionRepositoryChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/channel/DefaultTransactionRepositoryChannel.java similarity index 51% rename from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/channel/MemoryTransactionRepositoryChannel.java rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/channel/DefaultTransactionRepositoryChannel.java index ebea958..25e31d8 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/channel/MemoryTransactionRepositoryChannel.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/channel/DefaultTransactionRepositoryChannel.java @@ -17,55 +17,23 @@ package org.apache.servicecomb.pack.alpha.fsm.repository.channel; -import java.lang.invoke.MethodHandles; -import java.util.concurrent.LinkedBlockingQueue; +import org.apache.servicecomb.pack.alpha.core.fsm.repository.model.GlobalTransaction; import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; import org.apache.servicecomb.pack.alpha.fsm.repository.AbstractTransactionRepositoryChannel; -import org.apache.servicecomb.pack.alpha.core.fsm.repository.model.GlobalTransaction; import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepository; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class MemoryTransactionRepositoryChannel extends AbstractTransactionRepositoryChannel { +public class DefaultTransactionRepositoryChannel extends AbstractTransactionRepositoryChannel { - private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final LinkedBlockingQueue<GlobalTransaction> globalTransactionQueue; - private int size; - - public MemoryTransactionRepositoryChannel(TransactionRepository repository, int size, - MetricsService metricsService) { + public DefaultTransactionRepositoryChannel(TransactionRepository repository, MetricsService metricsService) { super(repository, metricsService); - this.size = size > 0 ? size : Integer.MAX_VALUE; - globalTransactionQueue = new LinkedBlockingQueue(this.size); - new Thread(new GlobalTransactionConsumer(), "MemoryTransactionRepositoryChannel").start(); } @Override public void sendTo(GlobalTransaction transaction) { try { - globalTransactionQueue.put(transaction); + repository.send(transaction); } catch (Exception e) { throw new RuntimeException(e); } } - - class GlobalTransactionConsumer implements Runnable { - - @Override - public void run() { - while (true) { - try { - GlobalTransaction transaction = globalTransactionQueue.peek(); - if (transaction != null) { - repository.send(transaction); - globalTransactionQueue.poll(); - } else { - Thread.sleep(10); - } - } catch (Exception ex) { - LOG.error(ex.getMessage(), ex); - } - } - } - } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java deleted file mode 100644 index 567185b..0000000 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.servicecomb.pack.alpha.fsm.sink; - -import akka.actor.ActorRef; -import akka.actor.ActorSelection; -import akka.actor.ActorSystem; -import akka.util.Timeout; -import java.lang.invoke.MethodHandles; -import java.util.concurrent.TimeUnit; - -import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink; -import org.apache.servicecomb.pack.alpha.fsm.SagaActor; -import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent; -import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; -import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; - -public class SagaActorEventSender implements ActorEventSink { - - private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - private final MetricsService metricsService; - - @Autowired - ActorSystem system; - - public SagaActorEventSender( - MetricsService metricsService) { - this.metricsService = metricsService; - } - - private static final Timeout lookupTimeout = new Timeout(Duration.create(1, TimeUnit.SECONDS)); - - public void send(BaseEvent event) { - long begin = System.currentTimeMillis(); - metricsService.metrics().doActorReceived(); - try{ - if (LOG.isDebugEnabled()) { - LOG.debug("send {} ", event.toString()); - } - if (event instanceof SagaStartedEvent) { - final ActorRef saga = system - .actorOf(SagaActor.props(event.getGlobalTxId()), event.getGlobalTxId()); - saga.tell(event, ActorRef.noSender()); - } else { - ActorSelection actorSelection = system - .actorSelection("/user/" + event.getGlobalTxId()); - //TODO We should leverage the async API that actor provides to send out the message - final Future<ActorRef> actorRefFuture = actorSelection.resolveOne(lookupTimeout); - final ActorRef saga = Await.result(actorRefFuture, lookupTimeout.duration()); - saga.tell(event, ActorRef.noSender()); - } - metricsService.metrics().doActorAccepted(); - long end = System.currentTimeMillis(); - metricsService.metrics().doActorAvgTime(end - begin); - }catch (Exception ex){ - metricsService.metrics().doActorRejected(); - throw new RuntimeException(ex); - } - } -} diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java index fe00de2..e32fbfe 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java @@ -33,13 +33,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.servicecomb.pack.alpha.fsm.SagaActorState; import org.apache.servicecomb.pack.alpha.core.fsm.TxState; import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepositoryChannel; -import org.apache.servicecomb.pack.alpha.fsm.repository.channel.MemoryTransactionRepositoryChannel; +import org.apache.servicecomb.pack.alpha.fsm.repository.channel.DefaultTransactionRepositoryChannel; import org.apache.servicecomb.pack.alpha.fsm.repository.elasticsearch.ElasticsearchTransactionRepository; import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepository; import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension; @@ -101,8 +100,7 @@ public class SagaActorTest { @Before public void before(){ TransactionRepository repository = new ElasticsearchTransactionRepository(template, metricsService, 0,0); - TransactionRepositoryChannel repositoryChannel = new MemoryTransactionRepositoryChannel(repository, -1, - metricsService); + TransactionRepositoryChannel repositoryChannel = new DefaultTransactionRepositoryChannel(repository, metricsService); SAGA_DATA_EXTENSION_PROVIDER.get(system).setRepositoryChannel(repositoryChannel); } diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java index 2acb135..03a89b0 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java @@ -24,11 +24,10 @@ import static org.junit.Assert.assertNotNull; import akka.actor.ActorSystem; import java.util.UUID; -import org.apache.servicecomb.pack.alpha.fsm.SagaActorState; import org.apache.servicecomb.pack.alpha.core.fsm.TxState; +import org.apache.servicecomb.pack.alpha.fsm.channel.memory.MemoryActorEventChannel; import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; -import org.apache.servicecomb.pack.alpha.fsm.sink.SagaActorEventSender; import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension; import org.junit.After; import org.junit.Test; @@ -61,7 +60,7 @@ public class SagaIntegrationTest { ActorSystem system; @Autowired - SagaActorEventSender sagaActorEventSender; + MemoryActorEventChannel memoryActorEventChannel; @Autowired MetricsService metricsService; @@ -81,7 +80,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.successfulEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaActorEventSender.send(event); + memoryActorEventChannel.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData(); @@ -105,7 +104,7 @@ public class SagaIntegrationTest { final String globalTxId = UUID.randomUUID().toString(); final String localTxId_1 = UUID.randomUUID().toString(); SagaEventSender.firstTxAbortedEvents(globalTxId, localTxId_1).stream().forEach( event -> { - sagaActorEventSender.send(event); + memoryActorEventChannel.send(event); }); await().atMost(2, SECONDS).until(() -> { @@ -125,7 +124,7 @@ public class SagaIntegrationTest { final String localTxId_1 = UUID.randomUUID().toString(); final String localTxId_2 = UUID.randomUUID().toString(); SagaEventSender.middleTxAbortedEvents(globalTxId, localTxId_1, localTxId_2).stream().forEach( event -> { - sagaActorEventSender.send(event); + memoryActorEventChannel.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData(); @@ -146,7 +145,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaActorEventSender.send(event); + memoryActorEventChannel.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData(); @@ -168,7 +167,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaActorEventSender.send(event); + memoryActorEventChannel.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData(); @@ -190,7 +189,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaActorEventSender.send(event); + memoryActorEventChannel.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData(); @@ -212,7 +211,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.sagaAbortedEventAfterAllTxEndedsEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaActorEventSender.send(event); + memoryActorEventChannel.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData(); @@ -234,7 +233,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.omegaSendSagaTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaActorEventSender.send(event); + memoryActorEventChannel.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData(); @@ -257,7 +256,7 @@ public class SagaIntegrationTest { final String localTxId_3 = UUID.randomUUID().toString(); final int timeout = 5; // second SagaEventSender.sagaActorTriggerTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3, timeout).stream().forEach( event -> { - sagaActorEventSender.send(event); + memoryActorEventChannel.send(event); }); await().atMost(timeout + 2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData(); @@ -279,7 +278,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaActorEventSender.send(event); + memoryActorEventChannel.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData(); @@ -301,7 +300,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaActorEventSender.send(event); + memoryActorEventChannel.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData(); @@ -323,7 +322,7 @@ public class SagaIntegrationTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaActorEventSender.send(event); + memoryActorEventChannel.send(event); }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();