xyuanlu commented on code in PR #2845:
URL: https://github.com/apache/helix/pull/2845#discussion_r1693481471


##########
helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java:
##########
@@ -0,0 +1,206 @@
+package org.apache.helix.gateway.participant;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import 
org.apache.helix.gateway.api.participant.HelixGatewayMultiTopStateStateTransitionProcessor;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.constant.MessageType;
+import 
org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+
+public class HelixGatewayParticipant implements 
HelixGatewayMultiTopStateStateTransitionProcessor {

Review Comment:
   nit: doc



##########
helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java:
##########
@@ -73,17 +75,20 @@ public void onCompleted() {
   /**
    * Send state transition message to the instance.
    * The instance must already have established a connection to the gateway 
service.
-   * @param instanceName
-   * @return
+   * @param instanceName the instance name to send the message to
+   * @param messageType the type of the message
+   * @param message the message to convert to the transition message
    */
   @Override
-  public boolean sendStateTransitionMessage(String instanceName) {
+  public void sendStateTransitionMessage(String instanceName, MessageType 
messageType,

Review Comment:
   Maybe we do not need to pass the MessageType here. we could read the from 
and f=to state In translation util. Message type also confuses with 
org.apache.helix.model.Message.MessageType. 



##########
helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java:
##########
@@ -0,0 +1,206 @@
+package org.apache.helix.gateway.participant;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import 
org.apache.helix.gateway.api.participant.HelixGatewayMultiTopStateStateTransitionProcessor;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.constant.MessageType;
+import 
org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+
+public class HelixGatewayParticipant implements 
HelixGatewayMultiTopStateStateTransitionProcessor {
+  private final HelixGatewayServiceProcessor _gatewayServiceProcessor;
+  private final HelixManager _participantManager;
+  private final Map<String, Map<String, String>> _shardStateMap;
+  private final Map<String, CompletableFuture<Boolean>> 
_stateTransitionResultMap;
+
+  private HelixGatewayParticipant(HelixGatewayServiceProcessor 
gatewayServiceProcessor,
+      HelixManager participantManager, Map<String, Map<String, String>> 
initialShardStateMap) {
+    _gatewayServiceProcessor = gatewayServiceProcessor;
+    _participantManager = participantManager;
+    _shardStateMap = initialShardStateMap;
+    _stateTransitionResultMap = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void processMultiTopStateModelStateTransitionMessage(Message message) 
throws Exception {
+    String transitionId = message.getMsgId();
+    String resourceId = message.getResourceName();
+    String shardId = message.getPartitionName();
+    String toState = message.getToState();
+
+    try {
+      if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) {
+        return;
+      }
+
+      CompletableFuture<Boolean> future = new CompletableFuture<>();
+      _stateTransitionResultMap.put(transitionId, future);
+      _gatewayServiceProcessor.sendStateTransitionMessage(
+          _participantManager.getInstanceName(),
+          determineTransitionType(resourceId, shardId, toState), message);
+
+      boolean success = future.get();
+      if (!success) {
+        throw new Exception("Failed to transition to state " + toState);
+      }
+
+      updateState(resourceId, shardId, toState);
+    } finally {
+      _stateTransitionResultMap.remove(transitionId);
+    }
+  }
+
+  @Override
+  public void handleStateTransitionError(Message message, StateTransitionError 
error) {
+    // Remove the stateTransitionResultMap future for the message
+    String transitionId = message.getMsgId();
+    String resourceId = message.getResourceName();
+    String shardId = message.getPartitionName();
+
+    // Remove the future from the stateTransitionResultMap since we are no 
longer able
+    // to process the state transition due to participant manager either 
timing out
+    // or failing to process the state transition
+    _stateTransitionResultMap.remove(transitionId);
+
+    // Set the replica state to ERROR
+    updateState(resourceId, shardId, HelixDefinedState.ERROR.name());
+
+    // Notify the HelixGatewayParticipantClient that it is in ERROR state
+    // TODO: We need a better way than sending the state transition with a 
toState of ERROR
+  }
+
+  /**
+   * Completes the state transition with the given transitionId.
+   *
+   * @param transitionId the transitionId to complete
+   * @param isSuccess    whether the state transition was successful
+   */
+  public void completeStateTransition(String transitionId, boolean isSuccess) {
+    CompletableFuture<Boolean> future = 
_stateTransitionResultMap.get(transitionId);
+    if (future != null) {
+      future.complete(isSuccess);
+    }
+  }
+
+  private boolean isCurrentStateAlreadyTarget(String resourceId, String 
shardId,
+      String targetState) {
+    return (_shardStateMap.containsKey(resourceId) && 
_shardStateMap.get(resourceId)
+        .containsKey(shardId) && 
_shardStateMap.get(resourceId).get(shardId).equals(targetState))
+        || targetState.equals(HelixDefinedState.DROPPED.name());
+  }
+
+  private MessageType determineTransitionType(String resourceId, String 
shardId,

Review Comment:
   maybe move to util? Then it can be shared with the translator util as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org

Reply via email to