This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 72c2ed628e46a916a516803b6c0391ceb4fb4bbb
Author: Lei Zhang <coolbee...@gmail.com>
AuthorDate: Fri Sep 27 11:53:53 2019 +0800

    SCB-1368 Ensure message delivery reliability between Kafka and 
ClusterShardRegion in cluster mode
---
 .../servicecomb/pack/alpha/fsm/SagaActor.java      | 90 +++++++++-------------
 .../pack/alpha/fsm/SagaShardRegionActor.java       | 25 ++++--
 .../fsm/channel/kafka/KafkaSagaEventConsumer.java  | 21 +++--
 .../src/main/resources/application.yaml            | 17 ++--
 4 files changed, 81 insertions(+), 72 deletions(-)

diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index 4bd536e..e64b82d 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -75,7 +75,6 @@ public class SagaActor extends
     when(SagaActorState.IDLE,
         matchEvent(SagaStartedEvent.class,
             (event, data) -> {
-              log(event);
               sagaBeginTime = System.currentTimeMillis();
               
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaBeginCounter();
               SagaStartedDomain domainEvent = new SagaStartedDomain(event);
@@ -96,7 +95,6 @@ public class SagaActor extends
     when(SagaActorState.READY,
         matchEvent(TxStartedEvent.class, SagaData.class,
             (event, data) -> {
-              log(event);
               AddTxEventDomain domainEvent = new AddTxEventDomain(event);
               if (data.getExpirationTime() != null) {
                 return goTo(SagaActorState.PARTIALLY_ACTIVE)
@@ -109,14 +107,12 @@ public class SagaActor extends
             }
         ).event(SagaEndedEvent.class,
             (event, data) -> {
-              log(event);
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, 
SagaActorState.SUSPENDED, SuspendedType.UNPREDICTABLE);
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent);
             }
         ).event(SagaAbortedEvent.class,
             (event, data) -> {
-              log(event);
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, 
SagaActorState.SUSPENDED, SuspendedType.UNPREDICTABLE);
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent);
@@ -132,7 +128,6 @@ public class SagaActor extends
     when(SagaActorState.PARTIALLY_ACTIVE,
         matchEvent(TxEndedEvent.class, SagaData.class,
             (event, data) -> {
-              log(event);
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
               if (data.getExpirationTime() != null) {
                 return goTo(SagaActorState.PARTIALLY_COMMITTED)
@@ -145,7 +140,6 @@ public class SagaActor extends
             }
         ).event(TxStartedEvent.class,
             (event, data) -> {
-              log(event);
               AddTxEventDomain domainEvent = new AddTxEventDomain(event);
               if (data.getExpirationTime() != null) {
                 return stay()
@@ -157,7 +151,6 @@ public class SagaActor extends
             }
         ).event(SagaTimeoutEvent.class,
             (event, data) -> {
-              log(event);
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, 
SagaActorState.SUSPENDED,
                   SuspendedType.TIMEOUT);
               return goTo(SagaActorState.SUSPENDED)
@@ -165,7 +158,6 @@ public class SagaActor extends
             }
         ).event(TxAbortedEvent.class,
             (event, data) -> {
-              log(event);
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
               return goTo(SagaActorState.FAILED)
                   .applying(domainEvent);
@@ -180,7 +172,6 @@ public class SagaActor extends
     when(SagaActorState.PARTIALLY_COMMITTED,
         matchEvent(TxStartedEvent.class,
             (event, data) -> {
-              log(event);
               AddTxEventDomain domainEvent = new AddTxEventDomain(event);
               if (data.getExpirationTime() != null) {
                 return goTo(SagaActorState.PARTIALLY_ACTIVE)
@@ -193,7 +184,6 @@ public class SagaActor extends
             }
         ).event(TxEndedEvent.class,
             (event, data) -> {
-              log(event);
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
               if (data.getExpirationTime() != null) {
                 return stay()
@@ -205,27 +195,23 @@ public class SagaActor extends
             }
         ).event(SagaTimeoutEvent.class,
             (event, data) -> {
-              log(event);
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, 
SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent);
             }
         ).event(SagaEndedEvent.class,
             (event, data) -> {
-              log(event);
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, 
SagaActorState.COMMITTED);
               return goTo(SagaActorState.COMMITTED)
                   .applying(domainEvent);
             }
         ).event(SagaAbortedEvent.class,
             (event, data) -> {
-              log(event);
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, 
SagaActorState.FAILED);
               return goTo(SagaActorState.FAILED).applying(domainEvent);
             }
         ).event(TxAbortedEvent.class,
             (event, data) -> {
-              log(event);
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
               return goTo(SagaActorState.FAILED).applying(domainEvent);
             }
@@ -239,14 +225,12 @@ public class SagaActor extends
     when(SagaActorState.FAILED,
         matchEvent(SagaTimeoutEvent.class, SagaData.class,
             (event, data) -> {
-              log(event);
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, 
SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent);
             }
         ).event(TxCompensatedEvent.class, SagaData.class,
             (event, data) -> {
-              log(event);
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
               return stay().applying(domainEvent).andThen(exec(_data -> {
                 self().tell(ComponsitedCheckEvent.builder().build(), self());
@@ -254,7 +238,6 @@ public class SagaActor extends
             }
         ).event(ComponsitedCheckEvent.class, SagaData.class,
             (event, data) -> {
-              log(event);
               if (hasCompensationSentTx(data) || !data.isTerminated()) {
                 return stay();
               } else {
@@ -266,7 +249,6 @@ public class SagaActor extends
             }
         ).event(SagaAbortedEvent.class, SagaData.class,
             (event, data) -> {
-              log(event);
               data.setTerminated(true);
               if (hasCommittedTx(data)) {
                 SagaEndedDomain domainEvent = new SagaEndedDomain(event, 
SagaActorState.FAILED);
@@ -285,13 +267,11 @@ public class SagaActor extends
             }
         ).event(TxStartedEvent.class, SagaData.class,
             (event, data) -> {
-              log(event);
               AddTxEventDomain domainEvent = new AddTxEventDomain(event);
               return stay().applying(domainEvent);
             }
         ).event(TxEndedEvent.class, SagaData.class,
             (event, data) -> {
-              log(event);
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
               return stay().applying(domainEvent).andThen(exec(_data -> {
                 TxEntity txEntity = 
_data.getTxEntityMap().get(event.getLocalTxId());
@@ -310,8 +290,7 @@ public class SagaActor extends
     when(SagaActorState.COMMITTED,
         
matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class,
             (event, data) -> {
-              log(event);
-              beforeStop(stateName(), data);
+              beforeStop(event, stateName(), data);
               return stop();
             }
         )
@@ -320,8 +299,7 @@ public class SagaActor extends
     when(SagaActorState.SUSPENDED,
         
matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class,
             (event, data) -> {
-              log(event);
-              beforeStop(stateName(), data);
+              beforeStop(event, stateName(), data);
               return stop();
             }
         )
@@ -330,8 +308,7 @@ public class SagaActor extends
     when(SagaActorState.COMPENSATED,
         
matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class,
             (event, data) -> {
-              log(event);
-              beforeStop(stateName(), data);
+              beforeStop(event, stateName(), data);
               return stop();
             }
         )
@@ -339,7 +316,9 @@ public class SagaActor extends
 
     whenUnhandled(
         matchAnyEvent((event, data) -> {
-          LOG.error("Unhandled event {}", event);
+          if (event instanceof BaseEvent){
+            LOG.error("Unhandled event {}", event);
+          }
           return stay();
         })
     );
@@ -352,33 +331,29 @@ public class SagaActor extends
                 .putSagaData(stateData().getGlobalTxId(), stateData());
           }
           if (LOG.isDebugEnabled()) {
-            LOG.debug("transition {} {} -> {}", stateData().getGlobalTxId(), 
from, to);
+            LOG.debug("transition [{}] {} -> {}", stateData().getGlobalTxId(), 
from, to);
           }
           if (to == SagaActorState.COMMITTED ||
               to == SagaActorState.SUSPENDED ||
               to == SagaActorState.COMPENSATED) {
             
self().tell(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.builder().build(),
 self());
           }
-          LOG.info("transition {} {} -> {}", stateData().getGlobalTxId(), 
from, to);
         })
     );
 
     onTermination(
         matchStop(
             Normal(), (state, data) -> {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("saga actor stopped {} {}", getSelf(), state);
-              }
-              LOG.info("stopped {} {}", data.getGlobalTxId(), state);
+              LOG.info("stopped [{}] {}", data.getGlobalTxId(), state);
             }
         )
     );
 
   }
 
-  private void beforeStop(SagaActorState state, SagaData data){
+  private void beforeStop(BaseEvent event, SagaActorState state, SagaData 
data){
     if (LOG.isDebugEnabled()) {
-      LOG.debug("stop {} {}", data.getGlobalTxId(), state);
+      LOG.debug("stop [{}] {}", data.getGlobalTxId(), state);
     }
     try{
       sagaEndTime = System.currentTimeMillis();
@@ -394,11 +369,8 @@ public class SagaActor extends
       // destroy self from cluster shard region
       getContext().getParent()
           .tell(new ShardRegion.Passivate(PoisonPill.getInstance()), 
getSelf());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("destroy saga actor {} from cluster shard region", 
getSelf());
-      }
 
-      // clear self mailbox from persistence
+      //  clear self mailbox from persistence
       //  已经停止的Actor使用以下两个命令清理,但是 highestSequenceNr 不会被删除,需要手工清理
       //  以下基于 journal-redis 说明:
       //    假设 globalTxId=ed2cdb9c-e86c-4b01-9f43-8e34704e7694, 那么在 Redis 
中会生成三个 key
@@ -418,10 +390,30 @@ public class SagaActor extends
       //      并删除 journal:persisted:item:highestSequenceNr
       //
       //  目前可以看到的解释是 https://github.com/akka/akka/issues/21181
+      //
+      //  Lua script akka-persistence-redis-clean.lua
+
+      //  local ids = redis.call('smembers','journal:persistenceIds');
+      //  local delkeys = {};
+      //  for k, v in pairs(ids) do
+      //    local jpid = 'journal:persisted:' .. v;
+      //    local jpidnr = 'journal:persisted:' .. v .. ':highestSequenceNr';
+      //    local hasjpid  = redis.call('exists',jpid);
+      //    if(hasjpid == 0)
+      //    then
+      //      local hasjpidnr  = redis.call('exists',jpidnr);
+      //      if(hasjpidnr == 1)
+      //      then
+      //        redis.call('del',jpidnr);
+      //        table.insert(delkeys,jpid);
+      //      end
+      //    end
+      //  end
+      //  return delkeys;
       deleteMessages(lastSequenceNr());
       deleteSnapshot(snapshotSequenceNr());
     }catch(Exception e){
-      LOG.error("stop {} fail",data.getGlobalTxId());
+      LOG.error("stop [{}] fail",data.getGlobalTxId());
       throw e;
     }
   }
@@ -430,11 +422,10 @@ public class SagaActor extends
   public SagaData applyEvent(DomainEvent event, SagaData data) {
     try{
       if (this.recoveryRunning()) {
-        LOG.info("SagaActor recovery {}",event.getEvent());
+        LOG.info("recovery {}",event.getEvent());
       }else if (LOG.isDebugEnabled()) {
-        LOG.debug("SagaActor apply event {}", event.getEvent());
+        LOG.debug("persistence {}", event.getEvent());
       }
-      // log event to SagaData
       if (event.getEvent() != null && !(event
           .getEvent() instanceof ComponsitedCheckEvent)) {
         data.logEvent(event.getEvent());
@@ -508,8 +499,9 @@ public class SagaActor extends
         }
       }
     }catch (Exception ex){
-      LOG.error("SagaActor apply event {}", event.getEvent());
-      beforeStop(SagaActorState.SUSPENDED, data);
+      LOG.error("apply {}", event.getEvent(), ex);
+      LOG.error(ex.getMessage(), ex);
+      beforeStop(event.getEvent(), SagaActorState.SUSPENDED, data);
       stop();
       //TODO 增加 SagaActor 处理失败指标
     }
@@ -519,7 +511,7 @@ public class SagaActor extends
   @Override
   public void onRecoveryCompleted() {
     if(stateName() != SagaActorState.IDLE){
-      LOG.info("SagaActor {} recovery completed, state={}", 
stateData().getGlobalTxId(), stateName());
+      LOG.info("recovery completed [{}] state={}", 
stateData().getGlobalTxId(), stateName());
     }
   }
 
@@ -585,10 +577,4 @@ public class SagaActor extends
       }
     }
   }
-
-  private void log(BaseEvent event) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(event.toString());
-    }
-  }
 }
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java
index 6e39033..daa4ee4 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java
@@ -24,11 +24,15 @@ import akka.actor.Props;
 import akka.cluster.sharding.ClusterSharding;
 import akka.cluster.sharding.ClusterShardingSettings;
 import akka.cluster.sharding.ShardRegion;
+import java.lang.invoke.MethodHandles;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SagaShardRegionActor extends AbstractActor {
 
-  private final ActorRef workerRegion;
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final ActorRef sagaActorRegion;
 
   static ShardRegion.MessageExtractor messageExtractor = new 
ShardRegion.MessageExtractor() {
     @Override
@@ -47,7 +51,7 @@ public class SagaShardRegionActor extends AbstractActor {
 
     @Override
     public String shardId(Object message) {
-      int numberOfShards = 100;
+      int numberOfShards = 10; // NOTE: Greater than the number of alpha nodes
       if (message instanceof BaseEvent) {
         String actorId = ((BaseEvent) message).getGlobalTxId();
         return String.valueOf(actorId.hashCode() % numberOfShards);
@@ -63,9 +67,9 @@ public class SagaShardRegionActor extends AbstractActor {
   public SagaShardRegionActor() {
     ActorSystem system = getContext().getSystem();
     ClusterShardingSettings settings = ClusterShardingSettings.create(system);
-    workerRegion = ClusterSharding.get(system)
+    sagaActorRegion = ClusterSharding.get(system)
         .start(
-            "saga-shard-region-actor",
+            SagaActor.class.getSimpleName(),
             Props.create(SagaActor.class),
             settings,
             messageExtractor);
@@ -74,8 +78,17 @@ public class SagaShardRegionActor extends AbstractActor {
   @Override
   public Receive createReceive() {
     return receiveBuilder()
-        .matchAny(msg -> {
-          workerRegion.tell(msg, getSelf());
+        .matchAny(event -> {
+          final BaseEvent evt = (BaseEvent) event;
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("=> [{}] {} {}", evt.getGlobalTxId(), evt.getType(), 
evt.getLocalTxId());
+          }
+
+          sagaActorRegion.tell(event, getSelf());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("<= [{}] {} {}", evt.getGlobalTxId(), evt.getType(), 
evt.getLocalTxId());
+          }
+          getSender().tell("confirm", getSelf());
         })
         .build();
   }
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java
index 7790c12..8ee2d40 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java
@@ -23,9 +23,11 @@ import akka.kafka.ConsumerMessage;
 import akka.kafka.ConsumerSettings;
 import akka.kafka.Subscriptions;
 import akka.kafka.javadsl.Consumer;
+import akka.pattern.Patterns;
 import akka.stream.ActorMaterializer;
 import akka.stream.Materializer;
 import akka.stream.javadsl.Sink;
+import akka.util.Timeout;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import java.lang.invoke.MethodHandles;
@@ -39,6 +41,9 @@ import 
org.apache.servicecomb.pack.alpha.fsm.channel.AbstractEventConsumer;
 import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 
 public class KafkaSagaEventConsumer extends AbstractEventConsumer {
 
@@ -64,10 +69,10 @@ public class KafkaSagaEventConsumer extends 
AbstractEventConsumer {
             .withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"StringDeserializer.class")
             .withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"StringDeserializer.class");
     Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
-        .mapAsync(10, event -> {
+        .mapAsync(20, event -> {
           BaseEvent bean = jsonMapper.readValue(event.record().value(), 
BaseEvent.class);
           if (LOG.isDebugEnabled()) {
-            LOG.debug("kafka receive {} {}", bean.getGlobalTxId(), 
bean.getType());
+            LOG.debug("receive [{}] {} {}", bean.getGlobalTxId(), 
bean.getType(), bean.getLocalTxId());
           }
           return sendSagaActor(bean).thenApply(done -> 
event.committableOffset());
         })
@@ -76,7 +81,7 @@ public class KafkaSagaEventConsumer extends 
AbstractEventConsumer {
             ConsumerMessage::createCommittableOffsetBatch,
             ConsumerMessage.CommittableOffsetBatch::updated
         )
-        .mapAsync(10, offset -> offset.commitJavadsl())
+        .mapAsync(20, offset -> offset.commitJavadsl())
         .to(Sink.ignore())
         .run(materializer);
   }
@@ -85,14 +90,14 @@ public class KafkaSagaEventConsumer extends 
AbstractEventConsumer {
     try {
       long begin = System.currentTimeMillis();
       metricsService.metrics().doActorReceived();
-      sagaShardRegionActor.tell(event, sagaShardRegionActor);
+      // Use the synchronous method call to ensure that Kafka's Offset is set 
after the delivery is successful.
+      Timeout timeout = new Timeout(Duration.create(10, "seconds"));
+      Future<Object> future = Patterns.ask(sagaShardRegionActor, event, 
timeout);
+      Await.result(future, timeout.duration());
       long end = System.currentTimeMillis();
       metricsService.metrics().doActorAccepted();
       metricsService.metrics().doActorAvgTime(end - begin);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("send saga actor {} {}", event, event.getType());
-      }
-      return CompletableFuture.completedFuture("");
+      return CompletableFuture.completedFuture("OK");
     } catch (Exception ex) {
       LOG.error(ex.getMessage(),ex);
       metricsService.metrics().doActorRejected();
diff --git a/alpha/alpha-server/src/main/resources/application.yaml 
b/alpha/alpha-server/src/main/resources/application.yaml
index 664f692..44c53af 100644
--- a/alpha/alpha-server/src/main/resources/application.yaml
+++ b/alpha/alpha-server/src/main/resources/application.yaml
@@ -88,11 +88,11 @@ akkaConfig:
       failure-detector:
         heartbeat-interval: 3s
         acceptable-heartbeat-pause: 6s
-      seed-nodes: ["akka://alpha-cluster@127.0.0.1:8070"]
-      sharding:
-        state-store-mode: "ddata" #ddata,persistence
-        remember-entities: true
-        shard-failure-backoff: 5s
+      seed-nodes: ["akka://alpha-cluster@0.0.0.0:8070"]
+    sharding:
+      state-store-mode: ddata
+      remember-entities: "on"
+      shard-failure-backoff: 5s
 
 management:
   endpoints:
@@ -160,11 +160,16 @@ akkaConfig:
   akka:
     actor:
       provider: cluster
-    persistence: # redis persistence
+    persistence:
+      at-least-once-delivery:
+        redeliver-interval: 10s
+        redelivery-burst-limit: 2000
       journal:
         plugin: akka-persistence-redis.journal
       snapshot-store:
         plugin: akka-persistence-redis.snapshot
+    sharding:
+      state-store-mode: persistence
     kafka:
       consumer:
         poll-interval: 50ms

Reply via email to