This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit d91304a2554fb4ed11124925a85ed0e546fa2fd9
Author: Lei Zhang <coolbee...@gmail.com>
AuthorDate: Fri Sep 27 19:44:23 2019 +0800

    SCB-1368 Update test cases for Akka Cluster Sharding
---
 .../servicecomb/pack/alpha/fsm/SagaActor.java      |  23 +--
 .../pack/alpha/fsm/SagaShardRegionActor.java       |  18 +-
 .../servicecomb/pack/alpha/fsm/SagaActorTest.java  |   1 -
 .../pack/alpha/fsm/SagaIntegrationTest.java        |   7 +-
 .../alpha-fsm/src/test/resources/application.yaml  | 205 +++++++++++++++++++++
 5 files changed, 232 insertions(+), 22 deletions(-)

diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index e64b82d..b7e0a1f 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -56,19 +56,20 @@ public class SagaActor extends
     AbstractPersistentFSM<SagaActorState, SagaData, DomainEvent> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
+  private String persistenceId;
+  private long sagaBeginTime;
+  private long sagaEndTime;
 
   public static Props props(String persistenceId) {
     return Props.create(SagaActor.class, persistenceId);
   }
 
-  private String persistenceId;
-
-  private long sagaBeginTime;
-  private long sagaEndTime;
-
-  public SagaActor() {
-    this.persistenceId = getSelf().path().name();
+  public SagaActor(String persistenceId) {
+    if (persistenceId != null) {
+      this.persistenceId = persistenceId;
+    } else {
+      this.persistenceId = getSelf().path().name();
+    }
 
     startWith(SagaActorState.IDLE, SagaData.builder().build());
 
@@ -487,14 +488,14 @@ public class SagaActor extends
             }
           });
         } else if (domainEvent.getState() == SagaActorState.SUSPENDED) {
-          data.setEndTime(event.getEvent().getCreateTime());
+          data.setEndTime(event.getEvent() != null ? 
event.getEvent().getCreateTime() : new Date());
           data.setTerminated(true);
           data.setSuspendedType(domainEvent.getSuspendedType());
         } else if (domainEvent.getState() == SagaActorState.COMPENSATED) {
-          data.setEndTime(event.getEvent().getCreateTime());
+          data.setEndTime(event.getEvent() != null ? 
event.getEvent().getCreateTime() : new Date());
           data.setTerminated(true);
         } else if (domainEvent.getState() == SagaActorState.COMMITTED) {
-          data.setEndTime(event.getEvent().getCreateTime());
+          data.setEndTime(event.getEvent() != null ? 
event.getEvent().getCreateTime() : new Date());
           data.setTerminated(true);
         }
       }
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java
index daa4ee4..5d0d6d8 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java
@@ -70,7 +70,7 @@ public class SagaShardRegionActor extends AbstractActor {
     sagaActorRegion = ClusterSharding.get(system)
         .start(
             SagaActor.class.getSimpleName(),
-            Props.create(SagaActor.class),
+            SagaActor.props(null),
             settings,
             messageExtractor);
   }
@@ -79,14 +79,16 @@ public class SagaShardRegionActor extends AbstractActor {
   public Receive createReceive() {
     return receiveBuilder()
         .matchAny(event -> {
-          final BaseEvent evt = (BaseEvent) event;
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("=> [{}] {} {}", evt.getGlobalTxId(), evt.getType(), 
evt.getLocalTxId());
-          }
+          if(event instanceof BaseEvent){
+            final BaseEvent evt = (BaseEvent) event;
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("=> [{}] {} {}", evt.getGlobalTxId(), evt.getType(), 
evt.getLocalTxId());
+            }
 
-          sagaActorRegion.tell(event, getSelf());
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("<= [{}] {} {}", evt.getGlobalTxId(), evt.getType(), 
evt.getLocalTxId());
+            sagaActorRegion.tell(event, getSelf());
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("<= [{}] {} {}", evt.getGlobalTxId(), evt.getType(), 
evt.getLocalTxId());
+            }
           }
           getSender().tell("confirm", getSelf());
         })
diff --git 
a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
 
b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index e32fbfe..2fe812e 100644
--- 
a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ 
b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -228,7 +228,6 @@ public class SagaActorTest {
           SagaActorState.PARTIALLY_ACTIVE);
 
       //expectTerminated(fsm);
-
       ActorRef recoveredSaga = system.actorOf(SagaActor.props(persistenceId), 
"recoveredSaga");
       watch(recoveredSaga);
       recoveredSaga.tell(new 
PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
diff --git 
a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
 
b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index 03a89b0..508be9f 100644
--- 
a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ 
b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -83,8 +83,11 @@ public class SagaIntegrationTest {
       memoryActorEventChannel.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
-      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
-      return sagaData !=null && sagaData.isTerminated() && 
sagaData.getLastState()== SagaActorState.COMMITTED;
+      SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system)
+          .getLastSagaData();
+      return sagaData != null && sagaData.isTerminated()
+          && sagaData.getLastState() == SagaActorState.COMMITTED
+          && metricsService.metrics().getSagaEndCounter() == 1;
     });
     SagaData sagaData = 
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
     assertNotNull(sagaData.getBeginTime());
diff --git a/alpha/alpha-fsm/src/test/resources/application.yaml 
b/alpha/alpha-fsm/src/test/resources/application.yaml
new file mode 100644
index 0000000..4f202be
--- /dev/null
+++ b/alpha/alpha-fsm/src/test/resources/application.yaml
@@ -0,0 +1,205 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+server:
+  port: 8090
+  host: 0.0.0.0
+
+alpha:
+  server:
+    host: ${server.host}
+    port: 8080
+  feature:
+    akka:
+      enabled: false
+      channel:
+        type: memory
+      transaction:
+        repository:
+          type: elasticsearch
+
+spring:
+  datasource:
+    initialization-mode: always
+  main:
+    allow-bean-definition-overriding: true
+  cloud:
+    consul:
+      host: 0.0.0.0
+      port: 8500
+      discovery:
+        serviceName: {spring.application.name}
+        healthCheckPath: /actuator/health
+        healthCheckInterval: 10s
+        instanceId: 
${spring.application.name}-${alpha.server.host}-${random.value}
+        tags: 
alpha-server-host=${alpha.server.host},alpha-server-port=${alpha.server.port}
+
+eureka:
+  client:
+    enabled: false
+  instance:
+    metadataMap:
+      servicecomb-alpha-server: ${alpha.server.host}:${alpha.server.port}
+
+
+akkaConfig:
+  akka:
+    loglevel: INFO
+    loggers: ["akka.event.slf4j.Slf4jLogger"]
+    logging-filter: akka.event.slf4j.Slf4jLoggingFilter
+    log-dead-letters: off
+    log-dead-letters-during-shutdown: off
+    actor:
+      warn-about-java-serializer-usage: false
+      provider: cluster
+    persistence:
+      journal:
+        plugin: akka.persistence.journal.inmem
+        leveldb.dir: actor/persistence/journal
+      snapshot-store:
+        plugin: akka.persistence.snapshot-store.local
+        local.dir: actor/persistence/snapshots
+    remote:
+      watch-failure-detector:
+        acceptable-heartbeat-pause: 6s
+      artery:
+        enabled: on
+        transport: tcp
+        advanced:
+          outbound-message-queue-size: 20000
+        canonical:
+          hostname: ${alpha.server.host}
+          port: 8070
+    cluster:
+      auto-down-unreachable-after: "off" # disable automatic downing
+      failure-detector:
+        heartbeat-interval: 3s
+        acceptable-heartbeat-pause: 6s
+      seed-nodes: ["akka://alpha-cluster@0.0.0.0:8070"]
+    sharding:
+      state-store-mode: ddata
+      remember-entities: "on"
+      shard-failure-backoff: 5s
+
+management:
+  endpoints:
+    web:
+      exposure:
+        include: "*"
+  health:
+    redis:
+      enabled: false
+    elasticsearch:
+      enabled: false
+
+---
+spring:
+  profiles: ssl
+alpha:
+  server:
+    ssl:
+      enable: true
+      cert: server.crt
+      key: server.pem
+      mutualAuth: true
+      clientCert: client.crt
+
+---
+spring:
+  profiles: prd
+  datasource:
+    username: saga
+    password: password
+    url: jdbc:postgresql://postgresql.servicecomb.io:5432/saga?useSSL=false
+    platform: postgresql
+    continue-on-error: false
+  jpa:
+    properties:
+      eclipselink:
+        ddl-generation: none
+
+---
+spring:
+  profiles: mysql
+  datasource:
+    username: saga
+    password: password
+    url: jdbc:mysql://mysql.servicecomb.io:3306/saga?useSSL=false
+    platform: mysql
+    continue-on-error: false
+  jpa:
+    properties:
+      eclipselink:
+        ddl-generation: none
+
+---
+spring:
+  profiles: cluster
+
+alpha:
+  feature:
+    akka:
+      enabled: true
+      channel:
+        type: kafka
+
+akkaConfig:
+  akka:
+    actor:
+      provider: cluster
+    persistence:
+      at-least-once-delivery:
+        redeliver-interval: 10s
+        redelivery-burst-limit: 2000
+      journal:
+        plugin: akka-persistence-redis.journal
+      snapshot-store:
+        plugin: akka-persistence-redis.snapshot
+    sharding:
+      state-store-mode: persistence
+    kafka:
+      consumer:
+        poll-interval: 50ms
+        stop-timeout: 30s
+        close-timeout: 20s
+        commit-timeout: 15s
+        commit-time-warning: 5s
+        commit-refresh-interval: infinite
+        use-dispatcher: "akka.kafka.saga-kafka"
+        kafka-clients.enable.auto.commit: false
+        wait-close-partition: 500ms
+        position-timeout: 10s
+        offset-for-times-timeout: 10s
+        metadata-request-timeout: 10s
+        eos-draining-check-interval: 30ms
+        partition-handler-warning: 5s
+        connection-checker.enable: false
+        connection-checker.max-retries: 3
+        connection-checker.check-interval: 15s
+        connection-checker.backoff-factor: 2.0
+      saga-kafka:
+        type: "Dispatcher"
+        executor: "thread-pool-executor"
+        thread-pool-executor:
+          fixed-pool-size: 20
+
+
+akka-persistence-redis:
+  redis:
+    mode: "simple"
+    host: "127.0.0.1"
+    port: 6379
+    database: 0
\ No newline at end of file

Reply via email to