This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 0373fa796d12682e488939220125ab182e6d4fa7
Author: Lei Zhang <coolbee...@gmail.com>
AuthorDate: Tue Sep 10 17:33:12 2019 +0800

    SCB-1368 Remove persistent queues for reliability
---
 ...va => DefaultTransactionRepositoryChannel.java} | 40 ++---------
 .../pack/alpha/fsm/sink/SagaActorEventSender.java  | 82 ----------------------
 .../servicecomb/pack/alpha/fsm/SagaActorTest.java  |  6 +-
 .../pack/alpha/fsm/SagaIntegrationTest.java        | 29 ++++----
 4 files changed, 20 insertions(+), 137 deletions(-)

diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/channel/MemoryTransactionRepositoryChannel.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/channel/DefaultTransactionRepositoryChannel.java
similarity index 51%
rename from 
alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/channel/MemoryTransactionRepositoryChannel.java
rename to 
alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/channel/DefaultTransactionRepositoryChannel.java
index ebea958..25e31d8 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/channel/MemoryTransactionRepositoryChannel.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/channel/DefaultTransactionRepositoryChannel.java
@@ -17,55 +17,23 @@
 
 package org.apache.servicecomb.pack.alpha.fsm.repository.channel;
 
-import java.lang.invoke.MethodHandles;
-import java.util.concurrent.LinkedBlockingQueue;
+import 
org.apache.servicecomb.pack.alpha.core.fsm.repository.model.GlobalTransaction;
 import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import 
org.apache.servicecomb.pack.alpha.fsm.repository.AbstractTransactionRepositoryChannel;
-import 
org.apache.servicecomb.pack.alpha.core.fsm.repository.model.GlobalTransaction;
 import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepository;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class MemoryTransactionRepositoryChannel extends 
AbstractTransactionRepositoryChannel {
+public class DefaultTransactionRepositoryChannel extends 
AbstractTransactionRepositoryChannel {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final LinkedBlockingQueue<GlobalTransaction> globalTransactionQueue;
-  private int size;
-
-  public MemoryTransactionRepositoryChannel(TransactionRepository repository, 
int size,
-      MetricsService metricsService) {
+  public DefaultTransactionRepositoryChannel(TransactionRepository repository, 
MetricsService metricsService) {
     super(repository, metricsService);
-    this.size = size > 0 ? size : Integer.MAX_VALUE;
-    globalTransactionQueue = new LinkedBlockingQueue(this.size);
-    new Thread(new GlobalTransactionConsumer(), 
"MemoryTransactionRepositoryChannel").start();
   }
 
   @Override
   public void sendTo(GlobalTransaction transaction) {
     try {
-      globalTransactionQueue.put(transaction);
+      repository.send(transaction);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
-
-  class GlobalTransactionConsumer implements Runnable {
-
-    @Override
-    public void run() {
-      while (true) {
-        try {
-          GlobalTransaction transaction = globalTransactionQueue.peek();
-          if (transaction != null) {
-            repository.send(transaction);
-            globalTransactionQueue.poll();
-          } else {
-            Thread.sleep(10);
-          }
-        } catch (Exception ex) {
-          LOG.error(ex.getMessage(), ex);
-        }
-      }
-    }
-  }
 }
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java
deleted file mode 100644
index 567185b..0000000
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.pack.alpha.fsm.sink;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
-import akka.util.Timeout;
-import java.lang.invoke.MethodHandles;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
-import org.apache.servicecomb.pack.alpha.fsm.SagaActor;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-
-public class SagaActorEventSender implements ActorEventSink {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private final MetricsService metricsService;
-
-  @Autowired
-  ActorSystem system;
-
-  public SagaActorEventSender(
-      MetricsService metricsService) {
-    this.metricsService = metricsService;
-  }
-
-  private static final Timeout lookupTimeout = new Timeout(Duration.create(1, 
TimeUnit.SECONDS));
-
-  public void send(BaseEvent event) {
-    long begin = System.currentTimeMillis();
-    metricsService.metrics().doActorReceived();
-    try{
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("send {} ", event.toString());
-      }
-      if (event instanceof SagaStartedEvent) {
-        final ActorRef saga = system
-            .actorOf(SagaActor.props(event.getGlobalTxId()), 
event.getGlobalTxId());
-        saga.tell(event, ActorRef.noSender());
-      } else {
-        ActorSelection actorSelection = system
-            .actorSelection("/user/" + event.getGlobalTxId());
-        //TODO We should leverage the async API that actor provides to send 
out the message
-        final Future<ActorRef> actorRefFuture = 
actorSelection.resolveOne(lookupTimeout);
-        final ActorRef saga = Await.result(actorRefFuture, 
lookupTimeout.duration());
-        saga.tell(event, ActorRef.noSender());
-      }
-      metricsService.metrics().doActorAccepted();
-      long end = System.currentTimeMillis();
-      metricsService.metrics().doActorAvgTime(end - begin);
-    }catch (Exception ex){
-      metricsService.metrics().doActorRejected();
-      throw new RuntimeException(ex);
-    }
-  }
-}
diff --git 
a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
 
b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index fe00de2..e32fbfe 100644
--- 
a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ 
b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -33,13 +33,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
 import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
 import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import 
org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepositoryChannel;
-import 
org.apache.servicecomb.pack.alpha.fsm.repository.channel.MemoryTransactionRepositoryChannel;
+import 
org.apache.servicecomb.pack.alpha.fsm.repository.channel.DefaultTransactionRepositoryChannel;
 import 
org.apache.servicecomb.pack.alpha.fsm.repository.elasticsearch.ElasticsearchTransactionRepository;
 import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepository;
 import 
org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
@@ -101,8 +100,7 @@ public class SagaActorTest {
   @Before
   public void before(){
     TransactionRepository repository = new 
ElasticsearchTransactionRepository(template, metricsService, 0,0);
-    TransactionRepositoryChannel repositoryChannel = new 
MemoryTransactionRepositoryChannel(repository, -1,
-        metricsService);
+    TransactionRepositoryChannel repositoryChannel = new 
DefaultTransactionRepositoryChannel(repository, metricsService);
     
SAGA_DATA_EXTENSION_PROVIDER.get(system).setRepositoryChannel(repositoryChannel);
   }
 
diff --git 
a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
 
b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index 2acb135..03a89b0 100644
--- 
a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ 
b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -24,11 +24,10 @@ import static org.junit.Assert.assertNotNull;
 
 import akka.actor.ActorSystem;
 import java.util.UUID;
-import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
 import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
+import 
org.apache.servicecomb.pack.alpha.fsm.channel.memory.MemoryActorEventChannel;
 import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
-import org.apache.servicecomb.pack.alpha.fsm.sink.SagaActorEventSender;
 import 
org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
 import org.junit.After;
 import org.junit.Test;
@@ -61,7 +60,7 @@ public class SagaIntegrationTest {
   ActorSystem system;
   
   @Autowired
-  SagaActorEventSender sagaActorEventSender;
+  MemoryActorEventChannel memoryActorEventChannel;
 
   @Autowired
   MetricsService metricsService;
@@ -81,7 +80,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.successfulEvents(globalTxId, localTxId_1, localTxId_2, 
localTxId_3).stream().forEach( event -> {
-      sagaActorEventSender.send(event);
+      memoryActorEventChannel.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
@@ -105,7 +104,7 @@ public class SagaIntegrationTest {
     final String globalTxId = UUID.randomUUID().toString();
     final String localTxId_1 = UUID.randomUUID().toString();
     SagaEventSender.firstTxAbortedEvents(globalTxId, 
localTxId_1).stream().forEach( event -> {
-      sagaActorEventSender.send(event);
+      memoryActorEventChannel.send(event);
     });
 
     await().atMost(2, SECONDS).until(() -> {
@@ -125,7 +124,7 @@ public class SagaIntegrationTest {
     final String localTxId_1 = UUID.randomUUID().toString();
     final String localTxId_2 = UUID.randomUUID().toString();
     SagaEventSender.middleTxAbortedEvents(globalTxId, localTxId_1, 
localTxId_2).stream().forEach( event -> {
-      sagaActorEventSender.send(event);
+      memoryActorEventChannel.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
@@ -146,7 +145,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, 
localTxId_3).stream().forEach( event -> {
-      sagaActorEventSender.send(event);
+      memoryActorEventChannel.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
@@ -168,7 +167,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, 
localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaActorEventSender.send(event);
+      memoryActorEventChannel.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
@@ -190,7 +189,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     
SagaEventSender.receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, 
localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaActorEventSender.send(event);
+      memoryActorEventChannel.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
@@ -212,7 +211,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.sagaAbortedEventAfterAllTxEndedsEvents(globalTxId, 
localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaActorEventSender.send(event);
+      memoryActorEventChannel.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
@@ -234,7 +233,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.omegaSendSagaTimeoutEvents(globalTxId, localTxId_1, 
localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaActorEventSender.send(event);
+      memoryActorEventChannel.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
@@ -257,7 +256,7 @@ public class SagaIntegrationTest {
     final String localTxId_3 = UUID.randomUUID().toString();
     final int timeout = 5; // second
     SagaEventSender.sagaActorTriggerTimeoutEvents(globalTxId, localTxId_1, 
localTxId_2, localTxId_3, timeout).stream().forEach( event -> {
-      sagaActorEventSender.send(event);
+      memoryActorEventChannel.send(event);
     });
     await().atMost(timeout + 2, SECONDS).until(() -> {
       SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
@@ -279,7 +278,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, 
localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaActorEventSender.send(event);
+      memoryActorEventChannel.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
@@ -301,7 +300,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, 
localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaActorEventSender.send(event);
+      memoryActorEventChannel.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
@@ -323,7 +322,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, 
localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaActorEventSender.send(event);
+      memoryActorEventChannel.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();

Reply via email to