This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 2d98d6b2c0b17274c5a6af82a5946ff4ce860450
Author: Lei Zhang <zhang...@apache.org>
AuthorDate: Sat Mar 21 22:12:21 2020 +0800

    SCB-1735 Catch the compensation method timeout exception and send a 
CompensateAckTimeoutEvent event and record
---
 .../event/internal/CompensateAckTimeoutEvent.java  | 86 ++++++++++++++++++++++
 .../servicecomb/pack/alpha/fsm/SagaActor.java      | 44 +++++++++--
 .../pack/alpha/fsm/domain/UpdateTxEventDomain.java |  9 +++
 3 files changed, 132 insertions(+), 7 deletions(-)

diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/CompensateAckTimeoutEvent.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/CompensateAckTimeoutEvent.java
new file mode 100644
index 0000000..d21503b
--- /dev/null
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/CompensateAckTimeoutEvent.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.core.fsm.event.internal;
+
+import java.util.Date;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.TxEvent;
+
+public class CompensateAckTimeoutEvent extends TxEvent {
+
+  private byte[] payloads;
+
+  public byte[] getPayloads() {
+    return payloads;
+  }
+
+  public void setPayloads(byte[] payloads) {
+    this.payloads = payloads;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static final class Builder {
+
+    private CompensateAckTimeoutEvent txCompensatedEvent;
+
+    private Builder() {
+      txCompensatedEvent = new CompensateAckTimeoutEvent();
+    }
+
+    public Builder serviceName(String serviceName) {
+      txCompensatedEvent.setServiceName(serviceName);
+      return this;
+    }
+
+    public Builder instanceId(String instanceId) {
+      txCompensatedEvent.setInstanceId(instanceId);
+      return this;
+    }
+
+    public Builder parentTxId(String parentTxId) {
+      txCompensatedEvent.setParentTxId(parentTxId);
+      return this;
+    }
+
+    public Builder localTxId(String localTxId) {
+      txCompensatedEvent.setLocalTxId(localTxId);
+      return this;
+    }
+
+    public Builder globalTxId(String globalTxId) {
+      txCompensatedEvent.setGlobalTxId(globalTxId);
+      return this;
+    }
+
+    public Builder createTime(Date createTime){
+      txCompensatedEvent.setCreateTime(createTime);
+      return this;
+    }
+
+    public Builder payloads(byte[] payloads){
+      txCompensatedEvent.setPayloads(payloads);
+      return this;
+    }
+
+    public CompensateAckTimeoutEvent build() {
+      return txCompensatedEvent;
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index a7c74e2..5e8aee8 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -21,17 +21,21 @@ import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.cluster.sharding.ShardRegion;
 import akka.persistence.fsm.AbstractPersistentFSM;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.lang.invoke.MethodHandles;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.apache.servicecomb.pack.alpha.core.AlphaException;
 import org.apache.servicecomb.pack.alpha.core.fsm.SuspendedType;
 import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
 import 
org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckFailedEvent;
 import 
org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import 
org.apache.servicecomb.pack.alpha.core.fsm.event.internal.CompensateAckTimeoutEvent;
 import org.apache.servicecomb.pack.alpha.fsm.domain.AddTxEventDomain;
 import org.apache.servicecomb.pack.alpha.fsm.domain.DomainEvent;
 import org.apache.servicecomb.pack.alpha.fsm.domain.SagaEndedDomain;
@@ -55,7 +59,7 @@ import scala.concurrent.duration.Duration;
 
 public class SagaActor extends
     AbstractPersistentFSM<SagaActorState, SagaData, DomainEvent> {
-
+  protected static final int PAYLOADS_MAX_LENGTH = 10240;
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private String persistenceId;
   private long sagaBeginTime;
@@ -245,6 +249,13 @@ public class SagaActor extends
                 self().tell(ComponsitedCheckEvent.builder().build(), self());
               }));
             }
+        ).event(CompensateAckTimeoutEvent.class, SagaData.class,
+            (event, data) -> {
+              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
+              return stay().applying(domainEvent).andThen(exec(_data -> {
+                self().tell(ComponsitedCheckEvent.builder().build(), self());
+              }));
+            }
         ).event(ComponsitedCheckEvent.class, SagaData.class,
             (event, data) -> {
               if (data.getTxEntities().hasCompensationSentTx() || 
!data.isTerminated()) {
@@ -485,15 +496,16 @@ public class SagaActor extends
               compensation(v, data);
             }
           });
-        } else if (domainEvent.getState() == TxState.COMPENSATED_SUCCEED || 
domainEvent.getState() == TxState.COMPENSATED_FAILED) {
+        } else if (domainEvent.getState() == TxState.COMPENSATED_SUCCEED) {
           // decrement the compensation running counter by one
           data.getCompensationRunningCounter().decrementAndGet();
           txEntity.setState(domainEvent.getState());
-          if (domainEvent.getState() == TxState.COMPENSATED_SUCCEED) {
-            LOG.info("compensation is succeed {}", txEntity.getLocalTxId());
-          } else {
-            LOG.info("compensation is failed {}", txEntity.getLocalTxId());
-          }
+          LOG.info("compensation is succeed {}", txEntity.getLocalTxId());
+        } else if (domainEvent.getState() == TxState.COMPENSATED_FAILED) {
+          data.getCompensationRunningCounter().decrementAndGet();
+          txEntity.setState(domainEvent.getState());
+          txEntity.setThrowablePayLoads(domainEvent.getThrowablePayLoads());
+          LOG.info("compensation is failed {}", txEntity.getLocalTxId());
         }
       } else if (event instanceof SagaEndedDomain) {
         SagaEndedDomain domainEvent = (SagaEndedDomain) event;
@@ -562,6 +574,24 @@ public class SagaActor extends
       }
       compensation(txEntity, data);
     } catch (Exception ex) {
+      if (ex instanceof TimeoutException) {
+        StringWriter writer = new StringWriter();
+        ex.printStackTrace(new PrintWriter(writer));
+        String stackTrace = writer.toString();
+        if (stackTrace.length() > PAYLOADS_MAX_LENGTH) {
+          stackTrace = stackTrace.substring(0, PAYLOADS_MAX_LENGTH);
+        }
+        CompensateAckTimeoutEvent event = CompensateAckTimeoutEvent.builder()
+            .createTime(new Date(System.currentTimeMillis()))
+            .globalTxId(txEntity.getGlobalTxId())
+            .parentTxId(txEntity.getParentTxId())
+            .localTxId(txEntity.getLocalTxId())
+            .serviceName(txEntity.getServiceName())
+            .instanceId(txEntity.getInstanceId())
+            .payloads(stackTrace.getBytes())
+            .build();
+        self().tell(event, self());
+      }
       LOG.error("compensation failed " + txEntity.getLocalTxId(), ex);
       if (txEntity.getReverseRetries() > 0) {
         // which means the retry number
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
index 79d5dc0..24877f7 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
@@ -21,6 +21,7 @@ import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.TxAbortedEvent;
 import 
org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckFailedEvent;
 import 
org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent;
+import 
org.apache.servicecomb.pack.alpha.core.fsm.event.internal.CompensateAckTimeoutEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
 
@@ -60,6 +61,14 @@ public class UpdateTxEventDomain implements DomainEvent {
     this.state = TxState.COMPENSATED_FAILED;
   }
 
+  public UpdateTxEventDomain(CompensateAckTimeoutEvent event) {
+    this.event = event;
+    this.parentTxId = event.getParentTxId();
+    this.localTxId = event.getLocalTxId();
+    this.throwablePayLoads = event.getPayloads();
+    this.state = TxState.COMPENSATED_FAILED;
+  }
+
   public String getParentTxId() {
     return parentTxId;
   }

Reply via email to