This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 819f19a8adbffde7e3eb025ea8b8a359416077eb Author: cherrylzhao <zhaoju...@126.com> AuthorDate: Wed Aug 22 21:52:13 2018 +0800 SCB-856 Implement alpha TCC workflow based on local memory. --- .../alpha/tcc/server/GrpcOmegaTccCallback.java | 48 +++++++++ .../saga/alpha/tcc/server/GrpcTccEventService.java | 72 +++++++++++++ .../saga/alpha/tcc/server/OmegaCallback.java | 29 ++++++ .../alpha/tcc/server/OmegaCallbacksRegistry.java | 71 +++++++++++++ .../alpha/tcc/server/TransactionEventRegistry.java | 55 ++++++++++ .../alpha/tcc/server/event/ParticipateEvent.java | 113 +++++++++++++++++++++ .../tcc/server/event/ParticipateEventFactory.java | 42 ++++++++ 7 files changed, 430 insertions(+) diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java new file mode 100644 index 0000000..e05b024 --- /dev/null +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.tcc.server; + +import io.grpc.stub.StreamObserver; +import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent; +import org.apache.servicecomb.saga.common.TransactionStatus; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand; + +/** + * Grpc omega callback for TCC workflow. + * + * @author zhaojun + */ +public final class GrpcOmegaTccCallback implements OmegaCallback { + + private StreamObserver<GrpcTccCordinateCommand> responseObserver; + + public GrpcOmegaTccCallback(StreamObserver<GrpcTccCordinateCommand> responseObserver) { + this.responseObserver = responseObserver; + } + + @Override + public void compensate(ParticipateEvent event, TransactionStatus status) { + GrpcTccCordinateCommand command = GrpcTccCordinateCommand.newBuilder() + .setGlobalTxId(event.getGlobalTxId()) + .setLocalTxId(event.getLocalTxId()) + .setParentTxId(event.getParentTxId() == null ? "" : event.getParentTxId()) + .setMethod(TransactionStatus.Succeed.equals(status) ? event.getConfirmMethod() : event.getCancelMethod()) + .build(); + responseObserver.onNext(command); + } +} diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java new file mode 100644 index 0000000..a76530b --- /dev/null +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.tcc.server; + +import io.grpc.stub.StreamObserver; +import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent; +import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEventFactory; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipateEvent; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent; +import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc; + +/** + * Grpc TCC event service implement. + * + * @author zhaojun + */ +public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImplBase { + + private static final GrpcAck ALLOW = GrpcAck.newBuilder().setAborted(false).build(); + private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build(); + + @Override + public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcTccCordinateCommand> responseObserver) { + OmegaCallbacksRegistry.register(request, responseObserver); + } + + @Override + public void onTccTransactionStarted(GrpcTccTransactionStartedEvent request, StreamObserver<GrpcAck> responseObserver) { + } + + @Override + public void participate(GrpcTccParticipateEvent request, StreamObserver<GrpcAck> responseObserver) { + TransactionEventRegistry.register(ParticipateEventFactory.create(request)); + responseObserver.onNext(ALLOW); + responseObserver.onCompleted(); + } + + @Override + public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request, StreamObserver<GrpcAck> responseObserver) { + for (ParticipateEvent event : TransactionEventRegistry.retrieve(request.getGlobalTxId())) { + OmegaCallbacksRegistry.retrieve(event.getServiceName(), event.getInstanceId()).compensate(event, event.getStatus()); + } + responseObserver.onNext(ALLOW); + responseObserver.onCompleted(); + } + + @Override + public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) { + OmegaCallbacksRegistry.retrieveThenRemove(request.getServiceName(), request.getInstanceId()).disconnect(); + responseObserver.onNext(ALLOW); + responseObserver.onCompleted(); + } +} diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallback.java new file mode 100644 index 0000000..14f8842 --- /dev/null +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallback.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.tcc.server; + +import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent; +import org.apache.servicecomb.saga.common.TransactionStatus; + +public interface OmegaCallback { + + void compensate(ParticipateEvent event, TransactionStatus status); + + default void disconnect() { + } +} diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java new file mode 100644 index 0000000..c505df1 --- /dev/null +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.tcc.server; + +import static java.util.Collections.emptyMap; + +import io.grpc.stub.StreamObserver; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand; + +/** + * Manage Omega callbacks. + * + * @author zhaojun + */ +public final class OmegaCallbacksRegistry { + + private final static Map<String, Map<String, OmegaCallback>> REGISTRY = new ConcurrentHashMap<>(); + + /** + * Register omega TCC callback. + * + * @param request Grpc service config + * @param responseObserver stream observer + */ + public static void register(GrpcServiceConfig request, StreamObserver<GrpcTccCordinateCommand> responseObserver) { + REGISTRY + .computeIfAbsent(request.getServiceName(), key -> new ConcurrentHashMap<>()) + .put(request.getInstanceId(), new GrpcOmegaTccCallback(responseObserver)); + } + + /** + * Retrieve omega TCC callback by service name and instance id. + * + * @param serviceName service name + * @param instanceId instance id + * @return Grpc omega TCC callback + */ + public static OmegaCallback retrieve(String serviceName, String instanceId) { + return REGISTRY.getOrDefault(serviceName, emptyMap()).get(instanceId); + } + + /** + * Retrieve omega TCC callback by service name and instance id, then remove it from registry. + * + * @param serviceName service name + * @param instanceId instance id + * @return Grpc omega TCC callback + */ + public static OmegaCallback retrieveThenRemove(String serviceName, String instanceId) { + return REGISTRY.getOrDefault(serviceName, emptyMap()).remove(instanceId); + } + +} diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java new file mode 100644 index 0000000..b89967a --- /dev/null +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.tcc.server; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent; + +/** + * Manage TCC transaction event. + * + * @author zhaojun + */ +public final class TransactionEventRegistry { + + private final static Map<String, List<ParticipateEvent>> REGISTRY = new ConcurrentHashMap<>(); + + /** + * Register participate event. + * + * @param participateEvent participate event + */ + public static void register(ParticipateEvent participateEvent) { + REGISTRY + .computeIfAbsent(participateEvent.getGlobalTxId(), key -> new LinkedList<>()) + .add(participateEvent); + } + + /** + * Retrieve participate event from registry. + * + * @param globalTxId global transaction id + * @return participate events + */ + public static List<ParticipateEvent> retrieve(String globalTxId) { + return REGISTRY.get(globalTxId); + } +} diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEvent.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEvent.java new file mode 100644 index 0000000..66182c6 --- /dev/null +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEvent.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.tcc.server.event; + +import org.apache.servicecomb.saga.common.TransactionStatus; + +/** + * Participate event. + * + * @author zhaojun + */ +public class ParticipateEvent { + + private String globalTxId; + private String localTxId; + private String parentTxId; + private String serviceName; + private String instanceId; + private String confirmMethod; + private String cancelMethod; + private TransactionStatus status; + + public ParticipateEvent(String globalTxId, String localTxId, String parentTxId, String serviceName, + String instanceId, String confirmMethod, String cancelMethod, TransactionStatus status) { + this.globalTxId = globalTxId; + this.localTxId = localTxId; + this.parentTxId = parentTxId; + this.serviceName = serviceName; + this.instanceId = instanceId; + this.confirmMethod = confirmMethod; + this.cancelMethod = cancelMethod; + this.status = status; + } + + public String getGlobalTxId() { + return globalTxId; + } + + public void setGlobalTxId(String globalTxId) { + this.globalTxId = globalTxId; + } + + public String getLocalTxId() { + return localTxId; + } + + public void setLocalTxId(String localTxId) { + this.localTxId = localTxId; + } + + public String getParentTxId() { + return parentTxId; + } + + public void setParentTxId(String parentTxId) { + this.parentTxId = parentTxId; + } + + public String getConfirmMethod() { + return confirmMethod; + } + + public void setConfirmMethod(String confirmMethod) { + this.confirmMethod = confirmMethod; + } + + public String getCancelMethod() { + return cancelMethod; + } + + public void setCancelMethod(String cancelMethod) { + this.cancelMethod = cancelMethod; + } + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public String getInstanceId() { + return instanceId; + } + + public void setInstanceId(String instanceId) { + this.instanceId = instanceId; + } + + public TransactionStatus getStatus() { + return status; + } + + public void setStatus(TransactionStatus status) { + this.status = status; + } +} diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java new file mode 100644 index 0000000..d876acf --- /dev/null +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.tcc.server.event; + +import org.apache.servicecomb.saga.common.TransactionStatus; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipateEvent; + +/** + * Participate event factory. + * + * @author zhaojun + */ +public class ParticipateEventFactory { + + public static ParticipateEvent create(GrpcTccParticipateEvent request) { + return new ParticipateEvent( + request.getGlobalTxId(), + request.getLocalTxId(), + request.getParentTxId(), + request.getConfirmMethod(), + request.getCancelMethod(), + request.getServiceName(), + request.getInstanceId(), + TransactionStatus.valueOf(request.getStatus()) + ); + } +}