This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 2d98d6b2c0b17274c5a6af82a5946ff4ce860450 Author: Lei Zhang <zhang...@apache.org> AuthorDate: Sat Mar 21 22:12:21 2020 +0800 SCB-1735 Catch the compensation method timeout exception and send a CompensateAckTimeoutEvent event and record --- .../event/internal/CompensateAckTimeoutEvent.java | 86 ++++++++++++++++++++++ .../servicecomb/pack/alpha/fsm/SagaActor.java | 44 +++++++++-- .../pack/alpha/fsm/domain/UpdateTxEventDomain.java | 9 +++ 3 files changed, 132 insertions(+), 7 deletions(-) diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/CompensateAckTimeoutEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/CompensateAckTimeoutEvent.java new file mode 100644 index 0000000..d21503b --- /dev/null +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/CompensateAckTimeoutEvent.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.alpha.core.fsm.event.internal; + +import java.util.Date; +import org.apache.servicecomb.pack.alpha.core.fsm.event.base.TxEvent; + +public class CompensateAckTimeoutEvent extends TxEvent { + + private byte[] payloads; + + public byte[] getPayloads() { + return payloads; + } + + public void setPayloads(byte[] payloads) { + this.payloads = payloads; + } + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + + private CompensateAckTimeoutEvent txCompensatedEvent; + + private Builder() { + txCompensatedEvent = new CompensateAckTimeoutEvent(); + } + + public Builder serviceName(String serviceName) { + txCompensatedEvent.setServiceName(serviceName); + return this; + } + + public Builder instanceId(String instanceId) { + txCompensatedEvent.setInstanceId(instanceId); + return this; + } + + public Builder parentTxId(String parentTxId) { + txCompensatedEvent.setParentTxId(parentTxId); + return this; + } + + public Builder localTxId(String localTxId) { + txCompensatedEvent.setLocalTxId(localTxId); + return this; + } + + public Builder globalTxId(String globalTxId) { + txCompensatedEvent.setGlobalTxId(globalTxId); + return this; + } + + public Builder createTime(Date createTime){ + txCompensatedEvent.setCreateTime(createTime); + return this; + } + + public Builder payloads(byte[] payloads){ + txCompensatedEvent.setPayloads(payloads); + return this; + } + + public CompensateAckTimeoutEvent build() { + return txCompensatedEvent; + } + } +} \ No newline at end of file diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java index a7c74e2..5e8aee8 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java @@ -21,17 +21,21 @@ import akka.actor.PoisonPill; import akka.actor.Props; import akka.cluster.sharding.ShardRegion; import akka.persistence.fsm.AbstractPersistentFSM; +import java.io.PrintWriter; +import java.io.StringWriter; import java.lang.invoke.MethodHandles; import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.servicecomb.pack.alpha.core.AlphaException; import org.apache.servicecomb.pack.alpha.core.fsm.SuspendedType; import org.apache.servicecomb.pack.alpha.core.fsm.TxState; import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckFailedEvent; import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent; import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.CompensateAckTimeoutEvent; import org.apache.servicecomb.pack.alpha.fsm.domain.AddTxEventDomain; import org.apache.servicecomb.pack.alpha.fsm.domain.DomainEvent; import org.apache.servicecomb.pack.alpha.fsm.domain.SagaEndedDomain; @@ -55,7 +59,7 @@ import scala.concurrent.duration.Duration; public class SagaActor extends AbstractPersistentFSM<SagaActorState, SagaData, DomainEvent> { - + protected static final int PAYLOADS_MAX_LENGTH = 10240; private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private String persistenceId; private long sagaBeginTime; @@ -245,6 +249,13 @@ public class SagaActor extends self().tell(ComponsitedCheckEvent.builder().build(), self()); })); } + ).event(CompensateAckTimeoutEvent.class, SagaData.class, + (event, data) -> { + UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event); + return stay().applying(domainEvent).andThen(exec(_data -> { + self().tell(ComponsitedCheckEvent.builder().build(), self()); + })); + } ).event(ComponsitedCheckEvent.class, SagaData.class, (event, data) -> { if (data.getTxEntities().hasCompensationSentTx() || !data.isTerminated()) { @@ -485,15 +496,16 @@ public class SagaActor extends compensation(v, data); } }); - } else if (domainEvent.getState() == TxState.COMPENSATED_SUCCEED || domainEvent.getState() == TxState.COMPENSATED_FAILED) { + } else if (domainEvent.getState() == TxState.COMPENSATED_SUCCEED) { // decrement the compensation running counter by one data.getCompensationRunningCounter().decrementAndGet(); txEntity.setState(domainEvent.getState()); - if (domainEvent.getState() == TxState.COMPENSATED_SUCCEED) { - LOG.info("compensation is succeed {}", txEntity.getLocalTxId()); - } else { - LOG.info("compensation is failed {}", txEntity.getLocalTxId()); - } + LOG.info("compensation is succeed {}", txEntity.getLocalTxId()); + } else if (domainEvent.getState() == TxState.COMPENSATED_FAILED) { + data.getCompensationRunningCounter().decrementAndGet(); + txEntity.setState(domainEvent.getState()); + txEntity.setThrowablePayLoads(domainEvent.getThrowablePayLoads()); + LOG.info("compensation is failed {}", txEntity.getLocalTxId()); } } else if (event instanceof SagaEndedDomain) { SagaEndedDomain domainEvent = (SagaEndedDomain) event; @@ -562,6 +574,24 @@ public class SagaActor extends } compensation(txEntity, data); } catch (Exception ex) { + if (ex instanceof TimeoutException) { + StringWriter writer = new StringWriter(); + ex.printStackTrace(new PrintWriter(writer)); + String stackTrace = writer.toString(); + if (stackTrace.length() > PAYLOADS_MAX_LENGTH) { + stackTrace = stackTrace.substring(0, PAYLOADS_MAX_LENGTH); + } + CompensateAckTimeoutEvent event = CompensateAckTimeoutEvent.builder() + .createTime(new Date(System.currentTimeMillis())) + .globalTxId(txEntity.getGlobalTxId()) + .parentTxId(txEntity.getParentTxId()) + .localTxId(txEntity.getLocalTxId()) + .serviceName(txEntity.getServiceName()) + .instanceId(txEntity.getInstanceId()) + .payloads(stackTrace.getBytes()) + .build(); + self().tell(event, self()); + } LOG.error("compensation failed " + txEntity.getLocalTxId(), ex); if (txEntity.getReverseRetries() > 0) { // which means the retry number diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java index 79d5dc0..24877f7 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java @@ -21,6 +21,7 @@ import org.apache.servicecomb.pack.alpha.core.fsm.TxState; import org.apache.servicecomb.pack.alpha.core.fsm.event.TxAbortedEvent; import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckFailedEvent; import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent; +import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.CompensateAckTimeoutEvent; import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent; import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; @@ -60,6 +61,14 @@ public class UpdateTxEventDomain implements DomainEvent { this.state = TxState.COMPENSATED_FAILED; } + public UpdateTxEventDomain(CompensateAckTimeoutEvent event) { + this.event = event; + this.parentTxId = event.getParentTxId(); + this.localTxId = event.getLocalTxId(); + this.throwablePayLoads = event.getPayloads(); + this.state = TxState.COMPENSATED_FAILED; + } + public String getParentTxId() { return parentTxId; }