This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit d91304a2554fb4ed11124925a85ed0e546fa2fd9 Author: Lei Zhang <coolbee...@gmail.com> AuthorDate: Fri Sep 27 19:44:23 2019 +0800 SCB-1368 Update test cases for Akka Cluster Sharding --- .../servicecomb/pack/alpha/fsm/SagaActor.java | 23 +-- .../pack/alpha/fsm/SagaShardRegionActor.java | 18 +- .../servicecomb/pack/alpha/fsm/SagaActorTest.java | 1 - .../pack/alpha/fsm/SagaIntegrationTest.java | 7 +- .../alpha-fsm/src/test/resources/application.yaml | 205 +++++++++++++++++++++ 5 files changed, 232 insertions(+), 22 deletions(-) diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java index e64b82d..b7e0a1f 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java @@ -56,19 +56,20 @@ public class SagaActor extends AbstractPersistentFSM<SagaActorState, SagaData, DomainEvent> { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - + private String persistenceId; + private long sagaBeginTime; + private long sagaEndTime; public static Props props(String persistenceId) { return Props.create(SagaActor.class, persistenceId); } - private String persistenceId; - - private long sagaBeginTime; - private long sagaEndTime; - - public SagaActor() { - this.persistenceId = getSelf().path().name(); + public SagaActor(String persistenceId) { + if (persistenceId != null) { + this.persistenceId = persistenceId; + } else { + this.persistenceId = getSelf().path().name(); + } startWith(SagaActorState.IDLE, SagaData.builder().build()); @@ -487,14 +488,14 @@ public class SagaActor extends } }); } else if (domainEvent.getState() == SagaActorState.SUSPENDED) { - data.setEndTime(event.getEvent().getCreateTime()); + data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date()); data.setTerminated(true); data.setSuspendedType(domainEvent.getSuspendedType()); } else if (domainEvent.getState() == SagaActorState.COMPENSATED) { - data.setEndTime(event.getEvent().getCreateTime()); + data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date()); data.setTerminated(true); } else if (domainEvent.getState() == SagaActorState.COMMITTED) { - data.setEndTime(event.getEvent().getCreateTime()); + data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date()); data.setTerminated(true); } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java index daa4ee4..5d0d6d8 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java @@ -70,7 +70,7 @@ public class SagaShardRegionActor extends AbstractActor { sagaActorRegion = ClusterSharding.get(system) .start( SagaActor.class.getSimpleName(), - Props.create(SagaActor.class), + SagaActor.props(null), settings, messageExtractor); } @@ -79,14 +79,16 @@ public class SagaShardRegionActor extends AbstractActor { public Receive createReceive() { return receiveBuilder() .matchAny(event -> { - final BaseEvent evt = (BaseEvent) event; - if (LOG.isDebugEnabled()) { - LOG.debug("=> [{}] {} {}", evt.getGlobalTxId(), evt.getType(), evt.getLocalTxId()); - } + if(event instanceof BaseEvent){ + final BaseEvent evt = (BaseEvent) event; + if (LOG.isDebugEnabled()) { + LOG.debug("=> [{}] {} {}", evt.getGlobalTxId(), evt.getType(), evt.getLocalTxId()); + } - sagaActorRegion.tell(event, getSelf()); - if (LOG.isDebugEnabled()) { - LOG.debug("<= [{}] {} {}", evt.getGlobalTxId(), evt.getType(), evt.getLocalTxId()); + sagaActorRegion.tell(event, getSelf()); + if (LOG.isDebugEnabled()) { + LOG.debug("<= [{}] {} {}", evt.getGlobalTxId(), evt.getType(), evt.getLocalTxId()); + } } getSender().tell("confirm", getSelf()); }) diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java index e32fbfe..2fe812e 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java @@ -228,7 +228,6 @@ public class SagaActorTest { SagaActorState.PARTIALLY_ACTIVE); //expectTerminated(fsm); - ActorRef recoveredSaga = system.actorOf(SagaActor.props(persistenceId), "recoveredSaga"); watch(recoveredSaga); recoveredSaga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java index 03a89b0..508be9f 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java @@ -83,8 +83,11 @@ public class SagaIntegrationTest { memoryActorEventChannel.send(event); }); await().atMost(2, SECONDS).until(() -> { - SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData(); - return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()== SagaActorState.COMMITTED; + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system) + .getLastSagaData(); + return sagaData != null && sagaData.isTerminated() + && sagaData.getLastState() == SagaActorState.COMMITTED + && metricsService.metrics().getSagaEndCounter() == 1; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData(); assertNotNull(sagaData.getBeginTime()); diff --git a/alpha/alpha-fsm/src/test/resources/application.yaml b/alpha/alpha-fsm/src/test/resources/application.yaml new file mode 100644 index 0000000..4f202be --- /dev/null +++ b/alpha/alpha-fsm/src/test/resources/application.yaml @@ -0,0 +1,205 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +server: + port: 8090 + host: 0.0.0.0 + +alpha: + server: + host: ${server.host} + port: 8080 + feature: + akka: + enabled: false + channel: + type: memory + transaction: + repository: + type: elasticsearch + +spring: + datasource: + initialization-mode: always + main: + allow-bean-definition-overriding: true + cloud: + consul: + host: 0.0.0.0 + port: 8500 + discovery: + serviceName: {spring.application.name} + healthCheckPath: /actuator/health + healthCheckInterval: 10s + instanceId: ${spring.application.name}-${alpha.server.host}-${random.value} + tags: alpha-server-host=${alpha.server.host},alpha-server-port=${alpha.server.port} + +eureka: + client: + enabled: false + instance: + metadataMap: + servicecomb-alpha-server: ${alpha.server.host}:${alpha.server.port} + + +akkaConfig: + akka: + loglevel: INFO + loggers: ["akka.event.slf4j.Slf4jLogger"] + logging-filter: akka.event.slf4j.Slf4jLoggingFilter + log-dead-letters: off + log-dead-letters-during-shutdown: off + actor: + warn-about-java-serializer-usage: false + provider: cluster + persistence: + journal: + plugin: akka.persistence.journal.inmem + leveldb.dir: actor/persistence/journal + snapshot-store: + plugin: akka.persistence.snapshot-store.local + local.dir: actor/persistence/snapshots + remote: + watch-failure-detector: + acceptable-heartbeat-pause: 6s + artery: + enabled: on + transport: tcp + advanced: + outbound-message-queue-size: 20000 + canonical: + hostname: ${alpha.server.host} + port: 8070 + cluster: + auto-down-unreachable-after: "off" # disable automatic downing + failure-detector: + heartbeat-interval: 3s + acceptable-heartbeat-pause: 6s + seed-nodes: ["akka://alpha-cluster@0.0.0.0:8070"] + sharding: + state-store-mode: ddata + remember-entities: "on" + shard-failure-backoff: 5s + +management: + endpoints: + web: + exposure: + include: "*" + health: + redis: + enabled: false + elasticsearch: + enabled: false + +--- +spring: + profiles: ssl +alpha: + server: + ssl: + enable: true + cert: server.crt + key: server.pem + mutualAuth: true + clientCert: client.crt + +--- +spring: + profiles: prd + datasource: + username: saga + password: password + url: jdbc:postgresql://postgresql.servicecomb.io:5432/saga?useSSL=false + platform: postgresql + continue-on-error: false + jpa: + properties: + eclipselink: + ddl-generation: none + +--- +spring: + profiles: mysql + datasource: + username: saga + password: password + url: jdbc:mysql://mysql.servicecomb.io:3306/saga?useSSL=false + platform: mysql + continue-on-error: false + jpa: + properties: + eclipselink: + ddl-generation: none + +--- +spring: + profiles: cluster + +alpha: + feature: + akka: + enabled: true + channel: + type: kafka + +akkaConfig: + akka: + actor: + provider: cluster + persistence: + at-least-once-delivery: + redeliver-interval: 10s + redelivery-burst-limit: 2000 + journal: + plugin: akka-persistence-redis.journal + snapshot-store: + plugin: akka-persistence-redis.snapshot + sharding: + state-store-mode: persistence + kafka: + consumer: + poll-interval: 50ms + stop-timeout: 30s + close-timeout: 20s + commit-timeout: 15s + commit-time-warning: 5s + commit-refresh-interval: infinite + use-dispatcher: "akka.kafka.saga-kafka" + kafka-clients.enable.auto.commit: false + wait-close-partition: 500ms + position-timeout: 10s + offset-for-times-timeout: 10s + metadata-request-timeout: 10s + eos-draining-check-interval: 30ms + partition-handler-warning: 5s + connection-checker.enable: false + connection-checker.max-retries: 3 + connection-checker.check-interval: 15s + connection-checker.backoff-factor: 2.0 + saga-kafka: + type: "Dispatcher" + executor: "thread-pool-executor" + thread-pool-executor: + fixed-pool-size: 20 + + +akka-persistence-redis: + redis: + mode: "simple" + host: "127.0.0.1" + port: 6379 + database: 0 \ No newline at end of file