This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 72c2ed628e46a916a516803b6c0391ceb4fb4bbb Author: Lei Zhang <coolbee...@gmail.com> AuthorDate: Fri Sep 27 11:53:53 2019 +0800 SCB-1368 Ensure message delivery reliability between Kafka and ClusterShardRegion in cluster mode --- .../servicecomb/pack/alpha/fsm/SagaActor.java | 90 +++++++++------------- .../pack/alpha/fsm/SagaShardRegionActor.java | 25 ++++-- .../fsm/channel/kafka/KafkaSagaEventConsumer.java | 21 +++-- .../src/main/resources/application.yaml | 17 ++-- 4 files changed, 81 insertions(+), 72 deletions(-) diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java index 4bd536e..e64b82d 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java @@ -75,7 +75,6 @@ public class SagaActor extends when(SagaActorState.IDLE, matchEvent(SagaStartedEvent.class, (event, data) -> { - log(event); sagaBeginTime = System.currentTimeMillis(); SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaBeginCounter(); SagaStartedDomain domainEvent = new SagaStartedDomain(event); @@ -96,7 +95,6 @@ public class SagaActor extends when(SagaActorState.READY, matchEvent(TxStartedEvent.class, SagaData.class, (event, data) -> { - log(event); AddTxEventDomain domainEvent = new AddTxEventDomain(event); if (data.getExpirationTime() != null) { return goTo(SagaActorState.PARTIALLY_ACTIVE) @@ -109,14 +107,12 @@ public class SagaActor extends } ).event(SagaEndedEvent.class, (event, data) -> { - log(event); SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.UNPREDICTABLE); return goTo(SagaActorState.SUSPENDED) .applying(domainEvent); } ).event(SagaAbortedEvent.class, (event, data) -> { - log(event); SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.UNPREDICTABLE); return goTo(SagaActorState.SUSPENDED) .applying(domainEvent); @@ -132,7 +128,6 @@ public class SagaActor extends when(SagaActorState.PARTIALLY_ACTIVE, matchEvent(TxEndedEvent.class, SagaData.class, (event, data) -> { - log(event); UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event); if (data.getExpirationTime() != null) { return goTo(SagaActorState.PARTIALLY_COMMITTED) @@ -145,7 +140,6 @@ public class SagaActor extends } ).event(TxStartedEvent.class, (event, data) -> { - log(event); AddTxEventDomain domainEvent = new AddTxEventDomain(event); if (data.getExpirationTime() != null) { return stay() @@ -157,7 +151,6 @@ public class SagaActor extends } ).event(SagaTimeoutEvent.class, (event, data) -> { - log(event); SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT); return goTo(SagaActorState.SUSPENDED) @@ -165,7 +158,6 @@ public class SagaActor extends } ).event(TxAbortedEvent.class, (event, data) -> { - log(event); UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event); return goTo(SagaActorState.FAILED) .applying(domainEvent); @@ -180,7 +172,6 @@ public class SagaActor extends when(SagaActorState.PARTIALLY_COMMITTED, matchEvent(TxStartedEvent.class, (event, data) -> { - log(event); AddTxEventDomain domainEvent = new AddTxEventDomain(event); if (data.getExpirationTime() != null) { return goTo(SagaActorState.PARTIALLY_ACTIVE) @@ -193,7 +184,6 @@ public class SagaActor extends } ).event(TxEndedEvent.class, (event, data) -> { - log(event); UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event); if (data.getExpirationTime() != null) { return stay() @@ -205,27 +195,23 @@ public class SagaActor extends } ).event(SagaTimeoutEvent.class, (event, data) -> { - log(event); SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT); return goTo(SagaActorState.SUSPENDED) .applying(domainEvent); } ).event(SagaEndedEvent.class, (event, data) -> { - log(event); SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMMITTED); return goTo(SagaActorState.COMMITTED) .applying(domainEvent); } ).event(SagaAbortedEvent.class, (event, data) -> { - log(event); SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED); return goTo(SagaActorState.FAILED).applying(domainEvent); } ).event(TxAbortedEvent.class, (event, data) -> { - log(event); UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event); return goTo(SagaActorState.FAILED).applying(domainEvent); } @@ -239,14 +225,12 @@ public class SagaActor extends when(SagaActorState.FAILED, matchEvent(SagaTimeoutEvent.class, SagaData.class, (event, data) -> { - log(event); SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT); return goTo(SagaActorState.SUSPENDED) .applying(domainEvent); } ).event(TxCompensatedEvent.class, SagaData.class, (event, data) -> { - log(event); UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event); return stay().applying(domainEvent).andThen(exec(_data -> { self().tell(ComponsitedCheckEvent.builder().build(), self()); @@ -254,7 +238,6 @@ public class SagaActor extends } ).event(ComponsitedCheckEvent.class, SagaData.class, (event, data) -> { - log(event); if (hasCompensationSentTx(data) || !data.isTerminated()) { return stay(); } else { @@ -266,7 +249,6 @@ public class SagaActor extends } ).event(SagaAbortedEvent.class, SagaData.class, (event, data) -> { - log(event); data.setTerminated(true); if (hasCommittedTx(data)) { SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED); @@ -285,13 +267,11 @@ public class SagaActor extends } ).event(TxStartedEvent.class, SagaData.class, (event, data) -> { - log(event); AddTxEventDomain domainEvent = new AddTxEventDomain(event); return stay().applying(domainEvent); } ).event(TxEndedEvent.class, SagaData.class, (event, data) -> { - log(event); UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event); return stay().applying(domainEvent).andThen(exec(_data -> { TxEntity txEntity = _data.getTxEntityMap().get(event.getLocalTxId()); @@ -310,8 +290,7 @@ public class SagaActor extends when(SagaActorState.COMMITTED, matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class, (event, data) -> { - log(event); - beforeStop(stateName(), data); + beforeStop(event, stateName(), data); return stop(); } ) @@ -320,8 +299,7 @@ public class SagaActor extends when(SagaActorState.SUSPENDED, matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class, (event, data) -> { - log(event); - beforeStop(stateName(), data); + beforeStop(event, stateName(), data); return stop(); } ) @@ -330,8 +308,7 @@ public class SagaActor extends when(SagaActorState.COMPENSATED, matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class, (event, data) -> { - log(event); - beforeStop(stateName(), data); + beforeStop(event, stateName(), data); return stop(); } ) @@ -339,7 +316,9 @@ public class SagaActor extends whenUnhandled( matchAnyEvent((event, data) -> { - LOG.error("Unhandled event {}", event); + if (event instanceof BaseEvent){ + LOG.error("Unhandled event {}", event); + } return stay(); }) ); @@ -352,33 +331,29 @@ public class SagaActor extends .putSagaData(stateData().getGlobalTxId(), stateData()); } if (LOG.isDebugEnabled()) { - LOG.debug("transition {} {} -> {}", stateData().getGlobalTxId(), from, to); + LOG.debug("transition [{}] {} -> {}", stateData().getGlobalTxId(), from, to); } if (to == SagaActorState.COMMITTED || to == SagaActorState.SUSPENDED || to == SagaActorState.COMPENSATED) { self().tell(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.builder().build(), self()); } - LOG.info("transition {} {} -> {}", stateData().getGlobalTxId(), from, to); }) ); onTermination( matchStop( Normal(), (state, data) -> { - if (LOG.isDebugEnabled()) { - LOG.debug("saga actor stopped {} {}", getSelf(), state); - } - LOG.info("stopped {} {}", data.getGlobalTxId(), state); + LOG.info("stopped [{}] {}", data.getGlobalTxId(), state); } ) ); } - private void beforeStop(SagaActorState state, SagaData data){ + private void beforeStop(BaseEvent event, SagaActorState state, SagaData data){ if (LOG.isDebugEnabled()) { - LOG.debug("stop {} {}", data.getGlobalTxId(), state); + LOG.debug("stop [{}] {}", data.getGlobalTxId(), state); } try{ sagaEndTime = System.currentTimeMillis(); @@ -394,11 +369,8 @@ public class SagaActor extends // destroy self from cluster shard region getContext().getParent() .tell(new ShardRegion.Passivate(PoisonPill.getInstance()), getSelf()); - if (LOG.isDebugEnabled()) { - LOG.debug("destroy saga actor {} from cluster shard region", getSelf()); - } - // clear self mailbox from persistence + // clear self mailbox from persistence // 已经停止的Actor使用以下两个命令清理,但是 highestSequenceNr 不会被删除,需要手工清理 // 以下基于 journal-redis 说明: // 假设 globalTxId=ed2cdb9c-e86c-4b01-9f43-8e34704e7694, 那么在 Redis 中会生成三个 key @@ -418,10 +390,30 @@ public class SagaActor extends // 并删除 journal:persisted:item:highestSequenceNr // // 目前可以看到的解释是 https://github.com/akka/akka/issues/21181 + // + // Lua script akka-persistence-redis-clean.lua + + // local ids = redis.call('smembers','journal:persistenceIds'); + // local delkeys = {}; + // for k, v in pairs(ids) do + // local jpid = 'journal:persisted:' .. v; + // local jpidnr = 'journal:persisted:' .. v .. ':highestSequenceNr'; + // local hasjpid = redis.call('exists',jpid); + // if(hasjpid == 0) + // then + // local hasjpidnr = redis.call('exists',jpidnr); + // if(hasjpidnr == 1) + // then + // redis.call('del',jpidnr); + // table.insert(delkeys,jpid); + // end + // end + // end + // return delkeys; deleteMessages(lastSequenceNr()); deleteSnapshot(snapshotSequenceNr()); }catch(Exception e){ - LOG.error("stop {} fail",data.getGlobalTxId()); + LOG.error("stop [{}] fail",data.getGlobalTxId()); throw e; } } @@ -430,11 +422,10 @@ public class SagaActor extends public SagaData applyEvent(DomainEvent event, SagaData data) { try{ if (this.recoveryRunning()) { - LOG.info("SagaActor recovery {}",event.getEvent()); + LOG.info("recovery {}",event.getEvent()); }else if (LOG.isDebugEnabled()) { - LOG.debug("SagaActor apply event {}", event.getEvent()); + LOG.debug("persistence {}", event.getEvent()); } - // log event to SagaData if (event.getEvent() != null && !(event .getEvent() instanceof ComponsitedCheckEvent)) { data.logEvent(event.getEvent()); @@ -508,8 +499,9 @@ public class SagaActor extends } } }catch (Exception ex){ - LOG.error("SagaActor apply event {}", event.getEvent()); - beforeStop(SagaActorState.SUSPENDED, data); + LOG.error("apply {}", event.getEvent(), ex); + LOG.error(ex.getMessage(), ex); + beforeStop(event.getEvent(), SagaActorState.SUSPENDED, data); stop(); //TODO 增加 SagaActor 处理失败指标 } @@ -519,7 +511,7 @@ public class SagaActor extends @Override public void onRecoveryCompleted() { if(stateName() != SagaActorState.IDLE){ - LOG.info("SagaActor {} recovery completed, state={}", stateData().getGlobalTxId(), stateName()); + LOG.info("recovery completed [{}] state={}", stateData().getGlobalTxId(), stateName()); } } @@ -585,10 +577,4 @@ public class SagaActor extends } } } - - private void log(BaseEvent event) { - if (LOG.isDebugEnabled()) { - LOG.debug(event.toString()); - } - } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java index 6e39033..daa4ee4 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java @@ -24,11 +24,15 @@ import akka.actor.Props; import akka.cluster.sharding.ClusterSharding; import akka.cluster.sharding.ClusterShardingSettings; import akka.cluster.sharding.ShardRegion; +import java.lang.invoke.MethodHandles; import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SagaShardRegionActor extends AbstractActor { - private final ActorRef workerRegion; + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final ActorRef sagaActorRegion; static ShardRegion.MessageExtractor messageExtractor = new ShardRegion.MessageExtractor() { @Override @@ -47,7 +51,7 @@ public class SagaShardRegionActor extends AbstractActor { @Override public String shardId(Object message) { - int numberOfShards = 100; + int numberOfShards = 10; // NOTE: Greater than the number of alpha nodes if (message instanceof BaseEvent) { String actorId = ((BaseEvent) message).getGlobalTxId(); return String.valueOf(actorId.hashCode() % numberOfShards); @@ -63,9 +67,9 @@ public class SagaShardRegionActor extends AbstractActor { public SagaShardRegionActor() { ActorSystem system = getContext().getSystem(); ClusterShardingSettings settings = ClusterShardingSettings.create(system); - workerRegion = ClusterSharding.get(system) + sagaActorRegion = ClusterSharding.get(system) .start( - "saga-shard-region-actor", + SagaActor.class.getSimpleName(), Props.create(SagaActor.class), settings, messageExtractor); @@ -74,8 +78,17 @@ public class SagaShardRegionActor extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder() - .matchAny(msg -> { - workerRegion.tell(msg, getSelf()); + .matchAny(event -> { + final BaseEvent evt = (BaseEvent) event; + if (LOG.isDebugEnabled()) { + LOG.debug("=> [{}] {} {}", evt.getGlobalTxId(), evt.getType(), evt.getLocalTxId()); + } + + sagaActorRegion.tell(event, getSelf()); + if (LOG.isDebugEnabled()) { + LOG.debug("<= [{}] {} {}", evt.getGlobalTxId(), evt.getType(), evt.getLocalTxId()); + } + getSender().tell("confirm", getSelf()); }) .build(); } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java index 7790c12..8ee2d40 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java @@ -23,9 +23,11 @@ import akka.kafka.ConsumerMessage; import akka.kafka.ConsumerSettings; import akka.kafka.Subscriptions; import akka.kafka.javadsl.Consumer; +import akka.pattern.Patterns; import akka.stream.ActorMaterializer; import akka.stream.Materializer; import akka.stream.javadsl.Sink; +import akka.util.Timeout; import com.fasterxml.jackson.databind.ObjectMapper; import com.typesafe.config.Config; import java.lang.invoke.MethodHandles; @@ -39,6 +41,9 @@ import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractEventConsumer; import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; public class KafkaSagaEventConsumer extends AbstractEventConsumer { @@ -64,10 +69,10 @@ public class KafkaSagaEventConsumer extends AbstractEventConsumer { .withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class") .withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class"); Consumer.committableSource(consumerSettings, Subscriptions.topics(topic)) - .mapAsync(10, event -> { + .mapAsync(20, event -> { BaseEvent bean = jsonMapper.readValue(event.record().value(), BaseEvent.class); if (LOG.isDebugEnabled()) { - LOG.debug("kafka receive {} {}", bean.getGlobalTxId(), bean.getType()); + LOG.debug("receive [{}] {} {}", bean.getGlobalTxId(), bean.getType(), bean.getLocalTxId()); } return sendSagaActor(bean).thenApply(done -> event.committableOffset()); }) @@ -76,7 +81,7 @@ public class KafkaSagaEventConsumer extends AbstractEventConsumer { ConsumerMessage::createCommittableOffsetBatch, ConsumerMessage.CommittableOffsetBatch::updated ) - .mapAsync(10, offset -> offset.commitJavadsl()) + .mapAsync(20, offset -> offset.commitJavadsl()) .to(Sink.ignore()) .run(materializer); } @@ -85,14 +90,14 @@ public class KafkaSagaEventConsumer extends AbstractEventConsumer { try { long begin = System.currentTimeMillis(); metricsService.metrics().doActorReceived(); - sagaShardRegionActor.tell(event, sagaShardRegionActor); + // Use the synchronous method call to ensure that Kafka's Offset is set after the delivery is successful. + Timeout timeout = new Timeout(Duration.create(10, "seconds")); + Future<Object> future = Patterns.ask(sagaShardRegionActor, event, timeout); + Await.result(future, timeout.duration()); long end = System.currentTimeMillis(); metricsService.metrics().doActorAccepted(); metricsService.metrics().doActorAvgTime(end - begin); - if (LOG.isDebugEnabled()) { - LOG.debug("send saga actor {} {}", event, event.getType()); - } - return CompletableFuture.completedFuture(""); + return CompletableFuture.completedFuture("OK"); } catch (Exception ex) { LOG.error(ex.getMessage(),ex); metricsService.metrics().doActorRejected(); diff --git a/alpha/alpha-server/src/main/resources/application.yaml b/alpha/alpha-server/src/main/resources/application.yaml index 664f692..44c53af 100644 --- a/alpha/alpha-server/src/main/resources/application.yaml +++ b/alpha/alpha-server/src/main/resources/application.yaml @@ -88,11 +88,11 @@ akkaConfig: failure-detector: heartbeat-interval: 3s acceptable-heartbeat-pause: 6s - seed-nodes: ["akka://alpha-cluster@127.0.0.1:8070"] - sharding: - state-store-mode: "ddata" #ddata,persistence - remember-entities: true - shard-failure-backoff: 5s + seed-nodes: ["akka://alpha-cluster@0.0.0.0:8070"] + sharding: + state-store-mode: ddata + remember-entities: "on" + shard-failure-backoff: 5s management: endpoints: @@ -160,11 +160,16 @@ akkaConfig: akka: actor: provider: cluster - persistence: # redis persistence + persistence: + at-least-once-delivery: + redeliver-interval: 10s + redelivery-burst-limit: 2000 journal: plugin: akka-persistence-redis.journal snapshot-store: plugin: akka-persistence-redis.snapshot + sharding: + state-store-mode: persistence kafka: consumer: poll-interval: 50ms