This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 2357d138d00832df634c74f8d5b06f4045d89a1a Author: zhang2014 <cos...@gmail.com> AuthorDate: Sun Jan 14 16:44:30 2018 +0800 SCB-224 omega support retries event --- .../saga/alpha/core/CompositeOmegaCallback.java | 43 +++------- .../servicecomb/saga/alpha/core/OmegaCallback.java | 5 +- .../saga/alpha/core/PushBackOmegaCallback.java | 10 --- .../saga/alpha/core/TxConsistentService.java | 1 - .../servicecomb/saga/alpha/core/TxEvent.java | 8 +- .../saga/alpha/server/GrpcOmegaCallback.java | 12 --- .../saga/alpha/server/GrpcTxEventEndpointImpl.java | 2 + .../saga/alpha/server/TxEventEnvelope.java | 96 ++++++++++++++++++++++ .../src/main/resources/schema-mysql.sql | 2 + .../saga/omega/context/CompensationContext.java | 16 ++-- .../saga/omega/context/OmegaContext.java | 2 + .../spring/TransactionAspectConfig.java | 4 +- .../omega/transaction/CompensableInterceptor.java | 10 ++- .../transaction/CompensationMessageHandler.java | 11 ++- .../CompensationMessageHandlerTest.java | 4 +- 15 files changed, 151 insertions(+), 75 deletions(-) diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java index b1c7e09..32e5102 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java @@ -17,9 +17,9 @@ package org.apache.servicecomb.saga.alpha.core; +import static java.util.Collections.emptyMap; + import java.util.Map; -import java.util.Objects; -import java.util.Optional; public class CompositeOmegaCallback implements OmegaCallback { private final Map<String, Map<String, OmegaCallback>> callbacks; @@ -29,44 +29,23 @@ public class CompositeOmegaCallback implements OmegaCallback { } @Override - public void retries(TxEvent event) { - OmegaCallback omegaCallback = callbackFor(event); + public void compensate(TxEvent event) { + Map<String, OmegaCallback> serviceCallbacks = callbacks.getOrDefault(event.serviceName(), emptyMap()); - try { - omegaCallback.retries(event); - } catch (Exception e) { - removeEventCallback(event, omegaCallback); - throw e; + if (serviceCallbacks.isEmpty()) { + throw new AlphaException("No such omega callback found for service " + event.serviceName()); } - } - @Override - public void compensate(TxEvent event) { - OmegaCallback omegaCallback = callbackFor(event); + OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId()); + if (omegaCallback == null) { + omegaCallback = serviceCallbacks.values().iterator().next(); + } try { omegaCallback.compensate(event); } catch (Exception e) { - removeEventCallback(event, omegaCallback); + serviceCallbacks.values().remove(omegaCallback); throw e; } } - - private OmegaCallback callbackFor(String instanceId, Map<String, OmegaCallback> serviceCallbacks) { - OmegaCallback omegaCallback = serviceCallbacks.get(instanceId); - if (Objects.isNull(omegaCallback)) { - return serviceCallbacks.values().iterator().next(); - } - return omegaCallback; - } - - private OmegaCallback callbackFor(TxEvent event) { - return Optional.ofNullable(callbacks.get(event.serviceName())).filter(callbacks -> !callbacks.isEmpty()) - .map(serviceCallbacks -> callbackFor(event.instanceId(), serviceCallbacks)) - .orElseThrow(() -> new AlphaException("No such omega callback found for service " + event.serviceName())); - } - - private void removeEventCallback(TxEvent event, OmegaCallback omegaCallback) { - callbacks.get(event.serviceName()).values().remove(omegaCallback); - } } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java index d926ed0..c7dfbbb 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java @@ -18,10 +18,7 @@ package org.apache.servicecomb.saga.alpha.core; public interface OmegaCallback { - void retries(TxEvent event); - void compensate(TxEvent event); - default void disconnect() { - } + default void disconnect() {} } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java index 290eb20..3b27c14 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java @@ -35,16 +35,6 @@ public class PushBackOmegaCallback implements OmegaCallback { } @Override - public void retries(TxEvent event) { - try { - underlying.compensate(event); - } catch (Exception e) { - logError(event, e); - pendingCompensations.offer(() -> compensate(event)); - } - } - - @Override public void compensate(TxEvent event) { try { underlying.compensate(event); diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java index a9a3ed2..b519c1b 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java @@ -29,7 +29,6 @@ import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; - public class TxConsistentService { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java index 5426e8b..1abb7fe 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java @@ -87,7 +87,7 @@ public class TxEvent { int timeout, byte[] payloads) { this(-1L, serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, timeout, - payloads); + "", 0, payloads); } public TxEvent( @@ -102,7 +102,7 @@ public class TxEvent { int timeout, byte[] payloads) { this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, compensationMethod, - timeout, payloads); + timeout, "", 0, payloads); } TxEvent(Long surrogateId, @@ -132,6 +132,8 @@ public class TxEvent { String type, String compensationMethod, Date expiryTime, + String retriesMethod, + int retries, byte[] payloads) { this.surrogateId = surrogateId; this.serviceName = serviceName; @@ -143,6 +145,8 @@ public class TxEvent { this.type = type; this.compensationMethod = compensationMethod; this.expiryTime = expiryTime; + this.retriesMethod = retriesMethod; + this.retries = retries; this.payloads = payloads; } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java index 83a6b9d..5a95281 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java @@ -37,18 +37,6 @@ class GrpcOmegaCallback implements OmegaCallback { } @Override - public void retries(TxEvent event) { - GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder() - .setGlobalTxId(event.globalTxId()) - .setLocalTxId(event.localTxId()) - .setParentTxId(event.parentTxId() == null ? "" : event.parentTxId()) - .setCompensateMethod(event.retriesMethod()) - .setPayloads(ByteString.copyFrom(event.payloads())) - .build(); - observer.onNext(command); - } - - @Override public void compensate(TxEvent event) { GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder() .setGlobalTxId(event.globalTxId()) diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java index ee7e2e4..99457a2 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java @@ -85,6 +85,8 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase { message.getType(), message.getCompensationMethod(), message.getTimeout(), + message.getRetriesMethod(), + message.getRetries(), message.getPayloads().toByteArray() )); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelope.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelope.java new file mode 100644 index 0000000..7d93462 --- /dev/null +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelope.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.server; + +import java.util.Date; + +import javax.persistence.Embedded; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.Id; + +import org.apache.servicecomb.saga.alpha.core.TxEvent; + +@Entity class TxEventEnvelope { + @Id + @GeneratedValue + private long surrogateId; + + @Embedded + private TxEvent event; + + private TxEventEnvelope() { + } + + TxEventEnvelope(TxEvent event) { + this.event = event; + } + + public TxEventEnvelope( + String serviceName, String instanceId, String globalTxId, + String localTxId, String parentTxId, String type, String compensationMethod, + String retriesMethod, int retries, byte[] payloads) { + this.event = new TxEvent( + serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, + compensationMethod, retriesMethod, retries, payloads); + } + + String serviceName() { + return event.serviceName(); + } + + String instanceId() { + return event.instanceId(); + } + + public long creationTime() { + return event.creationTime().getTime(); + } + + String globalTxId() { + return event.globalTxId(); + } + + String localTxId() { + return event.localTxId(); + } + + String parentTxId() { + return event.parentTxId(); + } + + String type() { + return event.type(); + } + + String compensationMethod() { + return event.compensationMethod(); + } + + byte[] payloads() { + return event.payloads(); + } + + int retries() { + return event.retries(); + } + + TxEvent event() { + return event; + } +} diff --git a/alpha/alpha-server/src/main/resources/schema-mysql.sql b/alpha/alpha-server/src/main/resources/schema-mysql.sql index b0bc8d7..c21e518 100644 --- a/alpha/alpha-server/src/main/resources/schema-mysql.sql +++ b/alpha/alpha-server/src/main/resources/schema-mysql.sql @@ -26,6 +26,8 @@ CREATE TABLE IF NOT EXISTS TxEvent ( type varchar(50) NOT NULL, compensationMethod varchar(256) NOT NULL, expiryTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, + retries_method varchar(256) NOT NULL, + retries int NOT NULL, payloads varbinary(10240), PRIMARY KEY (surrogateId), INDEX saga_events_index (surrogateId, globalTxId, localTxId, type, expiryTime) diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java index 067af92..8d9eb7e 100644 --- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java +++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java @@ -28,15 +28,20 @@ import org.slf4j.LoggerFactory; public class CompensationContext { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final Map<String, CompensationContextInternal> contexts = new ConcurrentHashMap<>(); + + private final Map<String, TransactionContextInternal> contexts = new ConcurrentHashMap<>(); + + public CompensationContext() { + } public void addCompensationContext(Method compensationMethod, Object target) { compensationMethod.setAccessible(true); - contexts.put(compensationMethod.toString(), new CompensationContextInternal(target, compensationMethod)); + contexts.put(compensationMethod.toString(), + new TransactionContextInternal(target, compensationMethod)); } public void compensate(String globalTxId, String localTxId, String compensationMethod, Object... payloads) { - CompensationContextInternal contextInternal = contexts.get(compensationMethod); + TransactionContextInternal contextInternal = contexts.get(compensationMethod); try { contextInternal.compensationMethod.invoke(contextInternal.target, payloads); @@ -49,11 +54,12 @@ public class CompensationContext { } } - private static final class CompensationContextInternal { + private static final class TransactionContextInternal { private final Object target; + private final Method compensationMethod; - private CompensationContextInternal(Object target, Method compensationMethod) { + private TransactionContextInternal(Object target, Method compensationMethod) { this.target = target; this.compensationMethod = compensationMethod; } diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java index daa8e7c..ac9f02c 100644 --- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java +++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java @@ -17,6 +17,8 @@ package org.apache.servicecomb.saga.omega.context; +import java.util.Map; + public class OmegaContext { public static final String GLOBAL_TX_ID_KEY = "X-Pack-Global-Transaction-Id"; public static final String LOCAL_TX_ID_KEY = "X-Pack-Local-Transaction-Id"; diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java index 5358db5..41be8f7 100644 --- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java +++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java @@ -34,8 +34,8 @@ import org.springframework.core.annotation.Order; public class TransactionAspectConfig { @Bean - MessageHandler messageHandler(MessageSender sender, CompensationContext context) { - return new CompensationMessageHandler(sender, context); + MessageHandler messageHandler(MessageSender sender, CompensationContext context, OmegaContext omegaContext) { + return new CompensationMessageHandler(sender, omegaContext, context); } @Order(0) diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java index fcce034..d3e2bd1 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java @@ -18,14 +18,17 @@ package org.apache.servicecomb.saga.omega.transaction; import org.apache.servicecomb.saga.omega.context.OmegaContext; +import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable; class CompensableInterceptor implements EventAwareInterceptor { + private final OmegaContext context; + private final MessageSender sender; CompensableInterceptor(OmegaContext context, MessageSender sender) { - this.context = context; this.sender = sender; + this.context = context; } @Override @@ -37,12 +40,11 @@ class CompensableInterceptor implements EventAwareInterceptor { @Override public void postIntercept(String parentTxId, String compensationMethod) { sender.send(new TxEndedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod)); - } @Override public void onError(String parentTxId, String compensationMethod, Throwable throwable) { - sender.send( - new TxAbortedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, throwable)); + sender.send(new TxAbortedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, + throwable)); } } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java index 46c1e9b..15cf91a 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java @@ -18,19 +18,26 @@ package org.apache.servicecomb.saga.omega.transaction; import org.apache.servicecomb.saga.omega.context.CompensationContext; +import org.apache.servicecomb.saga.omega.context.OmegaContext; public class CompensationMessageHandler implements MessageHandler { private final MessageSender sender; + private final OmegaContext omegaContext; private final CompensationContext context; - public CompensationMessageHandler(MessageSender sender, CompensationContext context) { + public CompensationMessageHandler(MessageSender sender, OmegaContext omegaContext, CompensationContext context) { this.sender = sender; this.context = context; + this.omegaContext = omegaContext; } @Override - public void onReceive(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) { + public void onReceive(String globalTxId, String localTxId, String parentTxId, + String compensationMethod, Object... payloads) { + String oldLocalTxId = omegaContext.localTxId(); + omegaContext.setLocalTxId(parentTxId); context.compensate(globalTxId, localTxId, compensationMethod, payloads); sender.send(new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod)); + omegaContext.setLocalTxId(oldLocalTxId); } } diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java index 0b33d4b..c9c7394 100644 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java @@ -28,6 +28,7 @@ import java.util.List; import org.apache.servicecomb.saga.common.EventType; import org.apache.servicecomb.saga.omega.context.CompensationContext; +import org.apache.servicecomb.saga.omega.context.OmegaContext; import org.junit.Test; public class CompensationMessageHandlerTest { @@ -44,8 +45,9 @@ public class CompensationMessageHandlerTest { private final String compensationMethod = getClass().getCanonicalName(); private final String payload = uniquify("blah"); + private final OmegaContext omegaContext = mock(OmegaContext.class); private final CompensationContext context = mock(CompensationContext.class); - private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, context); + private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, omegaContext, context); @Test public void sendsEventOnCompensationCompleted() throws Exception { -- To stop receiving notification emails like this one, please contact ningji...@apache.org.