xyuanlu commented on code in PR #2845: URL: https://github.com/apache/helix/pull/2845#discussion_r1693481471
########## helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java: ########## @@ -0,0 +1,206 @@ +package org.apache.helix.gateway.participant; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.helix.HelixDefinedState; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; +import org.apache.helix.gateway.api.participant.HelixGatewayMultiTopStateStateTransitionProcessor; +import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.constant.MessageType; +import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateTransitionError; + +public class HelixGatewayParticipant implements HelixGatewayMultiTopStateStateTransitionProcessor { Review Comment: nit: doc ########## helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java: ########## @@ -73,17 +75,20 @@ public void onCompleted() { /** * Send state transition message to the instance. * The instance must already have established a connection to the gateway service. - * @param instanceName - * @return + * @param instanceName the instance name to send the message to + * @param messageType the type of the message + * @param message the message to convert to the transition message */ @Override - public boolean sendStateTransitionMessage(String instanceName) { + public void sendStateTransitionMessage(String instanceName, MessageType messageType, Review Comment: Maybe we do not need to pass the MessageType here. we could read the from and f=to state In translation util. Message type also confuses with org.apache.helix.model.Message.MessageType. ########## helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java: ########## @@ -0,0 +1,206 @@ +package org.apache.helix.gateway.participant; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.helix.HelixDefinedState; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; +import org.apache.helix.gateway.api.participant.HelixGatewayMultiTopStateStateTransitionProcessor; +import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.constant.MessageType; +import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateTransitionError; + +public class HelixGatewayParticipant implements HelixGatewayMultiTopStateStateTransitionProcessor { + private final HelixGatewayServiceProcessor _gatewayServiceProcessor; + private final HelixManager _participantManager; + private final Map<String, Map<String, String>> _shardStateMap; + private final Map<String, CompletableFuture<Boolean>> _stateTransitionResultMap; + + private HelixGatewayParticipant(HelixGatewayServiceProcessor gatewayServiceProcessor, + HelixManager participantManager, Map<String, Map<String, String>> initialShardStateMap) { + _gatewayServiceProcessor = gatewayServiceProcessor; + _participantManager = participantManager; + _shardStateMap = initialShardStateMap; + _stateTransitionResultMap = new ConcurrentHashMap<>(); + } + + @Override + public void processMultiTopStateModelStateTransitionMessage(Message message) throws Exception { + String transitionId = message.getMsgId(); + String resourceId = message.getResourceName(); + String shardId = message.getPartitionName(); + String toState = message.getToState(); + + try { + if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) { + return; + } + + CompletableFuture<Boolean> future = new CompletableFuture<>(); + _stateTransitionResultMap.put(transitionId, future); + _gatewayServiceProcessor.sendStateTransitionMessage( + _participantManager.getInstanceName(), + determineTransitionType(resourceId, shardId, toState), message); + + boolean success = future.get(); + if (!success) { + throw new Exception("Failed to transition to state " + toState); + } + + updateState(resourceId, shardId, toState); + } finally { + _stateTransitionResultMap.remove(transitionId); + } + } + + @Override + public void handleStateTransitionError(Message message, StateTransitionError error) { + // Remove the stateTransitionResultMap future for the message + String transitionId = message.getMsgId(); + String resourceId = message.getResourceName(); + String shardId = message.getPartitionName(); + + // Remove the future from the stateTransitionResultMap since we are no longer able + // to process the state transition due to participant manager either timing out + // or failing to process the state transition + _stateTransitionResultMap.remove(transitionId); + + // Set the replica state to ERROR + updateState(resourceId, shardId, HelixDefinedState.ERROR.name()); + + // Notify the HelixGatewayParticipantClient that it is in ERROR state + // TODO: We need a better way than sending the state transition with a toState of ERROR + } + + /** + * Completes the state transition with the given transitionId. + * + * @param transitionId the transitionId to complete + * @param isSuccess whether the state transition was successful + */ + public void completeStateTransition(String transitionId, boolean isSuccess) { + CompletableFuture<Boolean> future = _stateTransitionResultMap.get(transitionId); + if (future != null) { + future.complete(isSuccess); + } + } + + private boolean isCurrentStateAlreadyTarget(String resourceId, String shardId, + String targetState) { + return (_shardStateMap.containsKey(resourceId) && _shardStateMap.get(resourceId) + .containsKey(shardId) && _shardStateMap.get(resourceId).get(shardId).equals(targetState)) + || targetState.equals(HelixDefinedState.DROPPED.name()); + } + + private MessageType determineTransitionType(String resourceId, String shardId, Review Comment: maybe move to util? Then it can be shared with the translator util as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org