This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 819f19a8adbffde7e3eb025ea8b8a359416077eb
Author: cherrylzhao <zhaoju...@126.com>
AuthorDate: Wed Aug 22 21:52:13 2018 +0800

    SCB-856 Implement alpha TCC workflow based on local memory.
---
 .../alpha/tcc/server/GrpcOmegaTccCallback.java     |  48 +++++++++
 .../saga/alpha/tcc/server/GrpcTccEventService.java |  72 +++++++++++++
 .../saga/alpha/tcc/server/OmegaCallback.java       |  29 ++++++
 .../alpha/tcc/server/OmegaCallbacksRegistry.java   |  71 +++++++++++++
 .../alpha/tcc/server/TransactionEventRegistry.java |  55 ++++++++++
 .../alpha/tcc/server/event/ParticipateEvent.java   | 113 +++++++++++++++++++++
 .../tcc/server/event/ParticipateEventFactory.java  |  42 ++++++++
 7 files changed, 430 insertions(+)

diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java
new file mode 100644
index 0000000..e05b024
--- /dev/null
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent;
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand;
+
+/**
+ * Grpc omega callback for TCC workflow.
+ *
+ * @author zhaojun
+ */
+public final class GrpcOmegaTccCallback implements OmegaCallback {
+
+  private StreamObserver<GrpcTccCordinateCommand> responseObserver;
+
+  public GrpcOmegaTccCallback(StreamObserver<GrpcTccCordinateCommand> 
responseObserver) {
+    this.responseObserver = responseObserver;
+  }
+
+  @Override
+  public void compensate(ParticipateEvent event, TransactionStatus status) {
+    GrpcTccCordinateCommand command = GrpcTccCordinateCommand.newBuilder()
+        .setGlobalTxId(event.getGlobalTxId())
+        .setLocalTxId(event.getLocalTxId())
+        .setParentTxId(event.getParentTxId() == null ? "" : 
event.getParentTxId())
+        .setMethod(TransactionStatus.Succeed.equals(status) ? 
event.getConfirmMethod() : event.getCancelMethod())
+        .build();
+    responseObserver.onNext(command);
+  }
+}
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java
new file mode 100644
index 0000000..a76530b
--- /dev/null
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent;
+import 
org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEventFactory;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipateEvent;
+import 
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+import 
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
+
+/**
+ * Grpc TCC event service implement.
+ *
+ * @author zhaojun
+ */
+public class GrpcTccEventService extends 
TccEventServiceGrpc.TccEventServiceImplBase {
+
+  private static final GrpcAck ALLOW = 
GrpcAck.newBuilder().setAborted(false).build();
+  private static final GrpcAck REJECT = 
GrpcAck.newBuilder().setAborted(true).build();
+
+  @Override
+  public void onConnected(GrpcServiceConfig request, 
StreamObserver<GrpcTccCordinateCommand> responseObserver) {
+    OmegaCallbacksRegistry.register(request, responseObserver);
+  }
+
+  @Override
+  public void onTccTransactionStarted(GrpcTccTransactionStartedEvent request, 
StreamObserver<GrpcAck> responseObserver) {
+  }
+
+  @Override
+  public void participate(GrpcTccParticipateEvent request, 
StreamObserver<GrpcAck> responseObserver) {
+    TransactionEventRegistry.register(ParticipateEventFactory.create(request));
+    responseObserver.onNext(ALLOW);
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request, 
StreamObserver<GrpcAck> responseObserver) {
+    for (ParticipateEvent event : 
TransactionEventRegistry.retrieve(request.getGlobalTxId())) {
+      OmegaCallbacksRegistry.retrieve(event.getServiceName(), 
event.getInstanceId()).compensate(event, event.getStatus());
+    }
+    responseObserver.onNext(ALLOW);
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void onDisconnected(GrpcServiceConfig request, 
StreamObserver<GrpcAck> responseObserver) {
+    OmegaCallbacksRegistry.retrieveThenRemove(request.getServiceName(), 
request.getInstanceId()).disconnect();
+    responseObserver.onNext(ALLOW);
+    responseObserver.onCompleted();
+  }
+}
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallback.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallback.java
new file mode 100644
index 0000000..14f8842
--- /dev/null
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallback.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server;
+
+import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent;
+import org.apache.servicecomb.saga.common.TransactionStatus;
+
+public interface OmegaCallback {
+
+  void compensate(ParticipateEvent event, TransactionStatus status);
+
+  default void disconnect() {
+  }
+}
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java
new file mode 100644
index 0000000..c505df1
--- /dev/null
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server;
+
+import static java.util.Collections.emptyMap;
+
+import io.grpc.stub.StreamObserver;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand;
+
+/**
+ * Manage Omega callbacks.
+ *
+ * @author zhaojun
+ */
+public final class OmegaCallbacksRegistry {
+
+  private final static Map<String, Map<String, OmegaCallback>> REGISTRY = new 
ConcurrentHashMap<>();
+
+  /**
+   * Register omega TCC callback.
+   *
+   * @param request Grpc service config
+   * @param responseObserver stream observer
+   */
+  public static void register(GrpcServiceConfig request, 
StreamObserver<GrpcTccCordinateCommand> responseObserver) {
+    REGISTRY
+        .computeIfAbsent(request.getServiceName(), key -> new 
ConcurrentHashMap<>())
+        .put(request.getInstanceId(), new 
GrpcOmegaTccCallback(responseObserver));
+  }
+
+  /**
+   * Retrieve omega TCC callback by service name and instance id.
+   *
+   * @param serviceName service name
+   * @param instanceId instance id
+   * @return Grpc omega TCC callback
+   */
+  public static OmegaCallback retrieve(String serviceName, String instanceId) {
+    return REGISTRY.getOrDefault(serviceName, emptyMap()).get(instanceId);
+  }
+
+  /**
+   * Retrieve omega TCC callback by service name and instance id, then remove 
it from registry.
+   *
+   * @param serviceName service name
+   * @param instanceId instance id
+   * @return Grpc omega TCC callback
+   */
+  public static OmegaCallback retrieveThenRemove(String serviceName, String 
instanceId) {
+    return REGISTRY.getOrDefault(serviceName, emptyMap()).remove(instanceId);
+  }
+
+}
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java
new file mode 100644
index 0000000..b89967a
--- /dev/null
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent;
+
+/**
+ * Manage TCC transaction event.
+ *
+ * @author zhaojun
+ */
+public final class TransactionEventRegistry {
+
+  private final static Map<String, List<ParticipateEvent>> REGISTRY = new 
ConcurrentHashMap<>();
+
+  /**
+   * Register participate event.
+   *
+   * @param participateEvent participate event
+   */
+  public static void register(ParticipateEvent participateEvent) {
+    REGISTRY
+        .computeIfAbsent(participateEvent.getGlobalTxId(), key -> new 
LinkedList<>())
+        .add(participateEvent);
+  }
+
+  /**
+   * Retrieve participate event from registry.
+   *
+   * @param globalTxId global transaction id
+   * @return participate events
+   */
+  public static List<ParticipateEvent> retrieve(String globalTxId) {
+    return REGISTRY.get(globalTxId);
+  }
+}
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEvent.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEvent.java
new file mode 100644
index 0000000..66182c6
--- /dev/null
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEvent.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server.event;
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+
+/**
+ * Participate event.
+ *
+ * @author zhaojun
+ */
+public class ParticipateEvent {
+
+  private String globalTxId;
+  private String localTxId;
+  private String parentTxId;
+  private String serviceName;
+  private String instanceId;
+  private String confirmMethod;
+  private String cancelMethod;
+  private TransactionStatus status;
+
+  public ParticipateEvent(String globalTxId, String localTxId, String 
parentTxId, String serviceName,
+      String instanceId, String confirmMethod, String cancelMethod, 
TransactionStatus status) {
+    this.globalTxId = globalTxId;
+    this.localTxId = localTxId;
+    this.parentTxId = parentTxId;
+    this.serviceName = serviceName;
+    this.instanceId = instanceId;
+    this.confirmMethod = confirmMethod;
+    this.cancelMethod = cancelMethod;
+    this.status = status;
+  }
+
+  public String getGlobalTxId() {
+    return globalTxId;
+  }
+
+  public void setGlobalTxId(String globalTxId) {
+    this.globalTxId = globalTxId;
+  }
+
+  public String getLocalTxId() {
+    return localTxId;
+  }
+
+  public void setLocalTxId(String localTxId) {
+    this.localTxId = localTxId;
+  }
+
+  public String getParentTxId() {
+    return parentTxId;
+  }
+
+  public void setParentTxId(String parentTxId) {
+    this.parentTxId = parentTxId;
+  }
+
+  public String getConfirmMethod() {
+    return confirmMethod;
+  }
+
+  public void setConfirmMethod(String confirmMethod) {
+    this.confirmMethod = confirmMethod;
+  }
+
+  public String getCancelMethod() {
+    return cancelMethod;
+  }
+
+  public void setCancelMethod(String cancelMethod) {
+    this.cancelMethod = cancelMethod;
+  }
+
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  public void setServiceName(String serviceName) {
+    this.serviceName = serviceName;
+  }
+
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public void setInstanceId(String instanceId) {
+    this.instanceId = instanceId;
+  }
+
+  public TransactionStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(TransactionStatus status) {
+    this.status = status;
+  }
+}
diff --git 
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java
 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java
new file mode 100644
index 0000000..d876acf
--- /dev/null
+++ 
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server.event;
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipateEvent;
+
+/**
+ * Participate event factory.
+ *
+ * @author zhaojun
+ */
+public class ParticipateEventFactory {
+
+  public static ParticipateEvent create(GrpcTccParticipateEvent request) {
+    return new ParticipateEvent(
+        request.getGlobalTxId(),
+        request.getLocalTxId(),
+        request.getParentTxId(),
+        request.getConfirmMethod(),
+        request.getCancelMethod(),
+        request.getServiceName(),
+        request.getInstanceId(),
+        TransactionStatus.valueOf(request.getStatus())
+    );
+  }
+}

Reply via email to