Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]
xyuanlu merged PR #2845: URL: https://github.com/apache/helix/pull/2845 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org
Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]
zpinto commented on PR #2845: URL: https://github.com/apache/helix/pull/2845#issuecomment-2261544116 This PR is ready to be merged! Comment: - Add ST handling logic to support multi top state state model definitions without intermediary states - Encapsulate the participant manager create, connect, disconnect logic in HelixGatewayParticipant - Add replica state tracking in HelixGatewayParticipant -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org
Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]
zpinto commented on code in PR #2845: URL: https://github.com/apache/helix/pull/2845#discussion_r1699157869 ## helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java: ## @@ -0,0 +1,245 @@ +package org.apache.helix.gateway.participant; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.helix.HelixDefinedState; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; +import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor; +import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateTransitionError; + +/** + * HelixGatewayParticipant encapsulates the Helix Participant Manager and handles tracking the state + * of a remote participant connected to the Helix Gateway Service. It processes state transitions + * for the participant and updates the state of the participant's shards upon successful state + * transitions signaled by remote participant. + */ +public class HelixGatewayParticipant implements HelixStateTransitionProcessor { + private final HelixGatewayServiceProcessor _gatewayServiceProcessor; + private final HelixManager _participantManager; + private final Map> _shardStateMap; + private final Map> _stateTransitionResultMap; + + private HelixGatewayParticipant(HelixGatewayServiceProcessor gatewayServiceProcessor, + HelixManager participantManager, Map> initialShardStateMap) { +_gatewayServiceProcessor = gatewayServiceProcessor; +_participantManager = participantManager; +_shardStateMap = initialShardStateMap; +_stateTransitionResultMap = new ConcurrentHashMap<>(); + } + + @Override + public void processStateTransitionMessage(Message message) throws Exception { +String transitionId = message.getMsgId(); +String resourceId = message.getResourceName(); +String shardId = message.getPartitionName(); +String toState = message.getToState(); + +try { + if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) { +return; + } + + CompletableFuture future = new CompletableFuture<>(); + _stateTransitionResultMap.put(transitionId, future); + _gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(), + getCurrentState(resourceId, shardId), message); + + if (!future.get()) { +throw new Exception("Failed to transition to state " + toState); + } + + updateState(resourceId, shardId, toState); +} finally { + _stateTransitionResultMap.remove(transitionId); +} + } + + @Override + public void handleStateTransitionError(Message message, StateTransitionError error) { +// Remove the stateTransitionResultMap future for the message +String transitionId = message.getMsgId(); +String resourceId = message.getResourceName(); +String shardId = message.getPartitionName(); + +// Remove the future from the stateTransitionResultMap since we are no longer able +// to process the state transition due to participant manager either timing out +// or failing to process the state transition +_stateTransitionResultMap.remove(transitionId); + +// Set the replica state to ERROR +updateState(resourceId, shardId, HelixDefinedState.ERROR.name()); + +// Notify the HelixGatewayParticipantClient that it is in ERROR state +// TODO: We need a better way than sending the state transition with a toState of ERROR + } + + /** + * Get the instance name of the participant. + * + * @return participant instance name + */ + public String getInstanceName() { +return _participantManager.getInstanceName(); + } + + /** + * Completes the st
Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]
zpinto commented on code in PR #2845: URL: https://github.com/apache/helix/pull/2845#discussion_r1699157425 ## helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java: ## @@ -0,0 +1,245 @@ +package org.apache.helix.gateway.participant; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.helix.HelixDefinedState; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; +import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor; +import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateTransitionError; + +/** + * HelixGatewayParticipant encapsulates the Helix Participant Manager and handles tracking the state + * of a remote participant connected to the Helix Gateway Service. It processes state transitions + * for the participant and updates the state of the participant's shards upon successful state + * transitions signaled by remote participant. + */ +public class HelixGatewayParticipant implements HelixStateTransitionProcessor { + private final HelixGatewayServiceProcessor _gatewayServiceProcessor; + private final HelixManager _participantManager; + private final Map> _shardStateMap; + private final Map> _stateTransitionResultMap; + + private HelixGatewayParticipant(HelixGatewayServiceProcessor gatewayServiceProcessor, + HelixManager participantManager, Map> initialShardStateMap) { +_gatewayServiceProcessor = gatewayServiceProcessor; +_participantManager = participantManager; +_shardStateMap = initialShardStateMap; +_stateTransitionResultMap = new ConcurrentHashMap<>(); + } + + @Override + public void processStateTransitionMessage(Message message) throws Exception { +String transitionId = message.getMsgId(); +String resourceId = message.getResourceName(); +String shardId = message.getPartitionName(); +String toState = message.getToState(); + +try { + if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) { +return; + } + + CompletableFuture future = new CompletableFuture<>(); + _stateTransitionResultMap.put(transitionId, future); + _gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(), + getCurrentState(resourceId, shardId), message); + + if (!future.get()) { +throw new Exception("Failed to transition to state " + toState); + } + + updateState(resourceId, shardId, toState); +} finally { + _stateTransitionResultMap.remove(transitionId); +} + } + + @Override + public void handleStateTransitionError(Message message, StateTransitionError error) { +// Remove the stateTransitionResultMap future for the message +String transitionId = message.getMsgId(); +String resourceId = message.getResourceName(); +String shardId = message.getPartitionName(); + +// Remove the future from the stateTransitionResultMap since we are no longer able +// to process the state transition due to participant manager either timing out +// or failing to process the state transition +_stateTransitionResultMap.remove(transitionId); + +// Set the replica state to ERROR +updateState(resourceId, shardId, HelixDefinedState.ERROR.name()); + +// Notify the HelixGatewayParticipantClient that it is in ERROR state +// TODO: We need a better way than sending the state transition with a toState of ERROR + } + + /** + * Get the instance name of the participant. + * + * @return participant instance name + */ + public String getInstanceName() { +return _participantManager.getInstanceName(); + } + + /** + * Completes the st
Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]
xyuanlu commented on PR #2845: URL: https://github.com/apache/helix/pull/2845#issuecomment-2261448579 Had an offline discussion with @zpinto. LGTM, Please address comments based on our discussion. :D -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org
Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]
zpinto commented on code in PR #2845: URL: https://github.com/apache/helix/pull/2845#discussion_r1697982618 ## helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java: ## @@ -92,17 +94,20 @@ public void onCompleted() { /** * Send state transition message to the instance. * The instance must already have established a connection to the gateway service. - * @param instanceName - * @return + * + * @param instanceName the instance name to send the message to + * @param currentState the current state of shard + * @param message the message to convert to the transition message */ @Override - public boolean sendStateTransitionMessage(String instanceName) { + public void sendStateTransitionMessage(String instanceName, String currentState, Review Comment: It is necessary to figure out the TransitionType. It is used to see if it is ADD, DELETE, or CHANGE_ROLE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org
Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]
zpinto commented on code in PR #2845: URL: https://github.com/apache/helix/pull/2845#discussion_r1697981290 ## helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java: ## @@ -0,0 +1,246 @@ +package org.apache.helix.gateway.participant; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.helix.HelixDefinedState; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; +import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor; +import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateTransitionError; + +/** + * HelixGatewayParticipant encapsulates the Helix Participant Manager and handles tracking the state + * of a remote participant connected to the Helix Gateway Service. It processes state transitions + * for the participant and updates the state of the participant's shards upon successful state + * transitions signaled by remote participant. + */ +public class HelixGatewayParticipant implements HelixStateTransitionProcessor { + private final HelixGatewayServiceProcessor _gatewayServiceProcessor; + private final HelixManager _participantManager; + private final Map> _shardStateMap; + private final Map> _stateTransitionResultMap; + + private HelixGatewayParticipant(HelixGatewayServiceProcessor gatewayServiceProcessor, + HelixManager participantManager, Map> initialShardStateMap) { +_gatewayServiceProcessor = gatewayServiceProcessor; +_participantManager = participantManager; +_shardStateMap = initialShardStateMap; +_stateTransitionResultMap = new ConcurrentHashMap<>(); + } + + @Override + public void processStateTransitionMessage(Message message) throws Exception { +String transitionId = message.getMsgId(); +String resourceId = message.getResourceName(); +String shardId = message.getPartitionName(); +String toState = message.getToState(); + +try { + if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) { +return; + } + + CompletableFuture future = new CompletableFuture<>(); + _stateTransitionResultMap.put(transitionId, future); + _gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(), + getCurrentState(resourceId, shardId), message); Review Comment: It is necessary to figure out the TransitionType. It is used to see if it is ADD, DELETE, or CHANGE_ROLE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org
Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]
xyuanlu commented on code in PR #2845: URL: https://github.com/apache/helix/pull/2845#discussion_r1697859886 ## helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java: ## @@ -92,17 +94,20 @@ public void onCompleted() { /** * Send state transition message to the instance. * The instance must already have established a connection to the gateway service. - * @param instanceName - * @return + * + * @param instanceName the instance name to send the message to + * @param currentState the current state of shard + * @param message the message to convert to the transition message */ @Override - public boolean sendStateTransitionMessage(String instanceName) { + public void sendStateTransitionMessage(String instanceName, String currentState, Review Comment: Why we need currentState here? ## helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java: ## @@ -0,0 +1,245 @@ +package org.apache.helix.gateway.participant; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.helix.HelixDefinedState; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; +import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor; +import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateTransitionError; + +/** + * HelixGatewayParticipant encapsulates the Helix Participant Manager and handles tracking the state + * of a remote participant connected to the Helix Gateway Service. It processes state transitions + * for the participant and updates the state of the participant's shards upon successful state + * transitions signaled by remote participant. + */ +public class HelixGatewayParticipant implements HelixStateTransitionProcessor { + private final HelixGatewayServiceProcessor _gatewayServiceProcessor; + private final HelixManager _participantManager; + private final Map> _shardStateMap; + private final Map> _stateTransitionResultMap; + + private HelixGatewayParticipant(HelixGatewayServiceProcessor gatewayServiceProcessor, + HelixManager participantManager, Map> initialShardStateMap) { +_gatewayServiceProcessor = gatewayServiceProcessor; +_participantManager = participantManager; +_shardStateMap = initialShardStateMap; +_stateTransitionResultMap = new ConcurrentHashMap<>(); + } + + @Override + public void processStateTransitionMessage(Message message) throws Exception { +String transitionId = message.getMsgId(); +String resourceId = message.getResourceName(); +String shardId = message.getPartitionName(); +String toState = message.getToState(); + +try { + if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) { +return; + } + + CompletableFuture future = new CompletableFuture<>(); + _stateTransitionResultMap.put(transitionId, future); + _gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(), + getCurrentState(resourceId, shardId), message); + + if (!future.get()) { +throw new Exception("Failed to transition to state " + toState); + } + + updateState(resourceId, shardId, toState); +} finally { + _stateTransitionResultMap.remove(transitionId); +} + } + + @Override + public void handleStateTransitionError(Message message, StateTransitionError error) { +// Remove the stateTransitionResultMap future for the message +String transitionId = message.getMsgId(); +String resourceId = message.getResourceName(); +String shardId = message.getPartitionName(); + +// Remove th
Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]
zpinto commented on code in PR #2845: URL: https://github.com/apache/helix/pull/2845#discussion_r1697790847 ## helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java: ## @@ -89,33 +91,47 @@ public void run() { * It includes waiting for ZK connection, and also wait for previous LiveInstance to expire. */ class participantConnectionProcessor implements Runnable { -GatewayServiceEvent _event; +private final GatewayServiceEvent _event; -public participantConnectionProcessor(GatewayServiceEvent event) { +private participantConnectionProcessor(GatewayServiceEvent event) { _event = event; } @Override public void run() { - HelixGatewayService helixGatewayService; - _helixGatewayServiceMap.computeIfAbsent(_event.getClusterName(), - k -> new HelixGatewayService(GatewayServiceManager.this, _event.getClusterName())); - helixGatewayService = _helixGatewayServiceMap.get(_event.getClusterName()); if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) { -helixGatewayService.registerParticipant(); +createHelixGatewayParticipant(_event.getClusterName(), _event.getInstanceName(), +_event.getShardStateMap()); } else { -helixGatewayService.deregisterParticipant(_event.getClusterName(), _event.getInstanceName()); +removeHelixGatewayParticipant(_event.getClusterName(), _event.getInstanceName()); } } } - @VisibleForTesting - HelixGatewayServiceGrpcService getGrpcService() { -return _grpcService; + private void createHelixGatewayParticipant(String clusterName, String instanceName, + Map> initialShardStateMap) { +// Create and add the participant to the participant map +HelixGatewayParticipant.Builder participantBuilder = +new HelixGatewayParticipant.Builder(_gatewayServiceProcessor, instanceName, clusterName, +_zkAddress).addInitialShardState(initialShardStateMap); +SUPPORTED_MULTI_STATE_MODEL_TYPES.forEach( +participantBuilder::addMultiTopStateStateModelDefinition); +_helixGatewayParticipantMap.computeIfAbsent(clusterName, k -> new ConcurrentHashMap<>()) +.put(instanceName, participantBuilder.build()); + } + + private void removeHelixGatewayParticipant(String clusterName, String instanceName) { +// Disconnect and remove the participant from the participant map +HelixGatewayParticipant participant = getHelixGatewayParticipant(clusterName, instanceName); +if (participant != null) { Review Comment: I think we need the getter to return null, otherwise what will it return if null check is there? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org
Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]
zpinto commented on code in PR #2845: URL: https://github.com/apache/helix/pull/2845#discussion_r1697769176 ## helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java: ## @@ -0,0 +1,246 @@ +package org.apache.helix.gateway.participant; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.helix.HelixDefinedState; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; +import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor; +import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateTransitionError; + +/** + * HelixGatewayParticipant encapsulates the Helix Participant Manager and handles tracking the state + * of a remote participant connected to the Helix Gateway Service. It processes state transitions + * for the participant and updates the state of the participant's shards upon successful state + * transitions signaled by remote participant. + */ +public class HelixGatewayParticipant implements HelixStateTransitionProcessor { + private final HelixGatewayServiceProcessor _gatewayServiceProcessor; + private final HelixManager _participantManager; + private final Map> _shardStateMap; + private final Map> _stateTransitionResultMap; + + private HelixGatewayParticipant(HelixGatewayServiceProcessor gatewayServiceProcessor, + HelixManager participantManager, Map> initialShardStateMap) { +_gatewayServiceProcessor = gatewayServiceProcessor; +_participantManager = participantManager; +_shardStateMap = initialShardStateMap; +_stateTransitionResultMap = new ConcurrentHashMap<>(); + } + + @Override + public void processStateTransitionMessage(Message message) throws Exception { +String transitionId = message.getMsgId(); +String resourceId = message.getResourceName(); +String shardId = message.getPartitionName(); +String toState = message.getToState(); + +try { + if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) { +return; + } + + CompletableFuture future = new CompletableFuture<>(); + _stateTransitionResultMap.put(transitionId, future); + _gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(), + getCurrentState(resourceId, shardId), message); + + boolean success = future.get(); + if (!success) { +throw new Exception("Failed to transition to state " + toState); + } + + updateState(resourceId, shardId, toState); +} finally { + _stateTransitionResultMap.remove(transitionId); +} + } + + @Override + public void handleStateTransitionError(Message message, StateTransitionError error) { +// Remove the stateTransitionResultMap future for the message +String transitionId = message.getMsgId(); +String resourceId = message.getResourceName(); +String shardId = message.getPartitionName(); + +// Remove the future from the stateTransitionResultMap since we are no longer able +// to process the state transition due to participant manager either timing out +// or failing to process the state transition +_stateTransitionResultMap.remove(transitionId); + +// Set the replica state to ERROR +updateState(resourceId, shardId, HelixDefinedState.ERROR.name()); + +// Notify the HelixGatewayParticipantClient that it is in ERROR state Review Comment: I think we should hold off on this for now. We may want to have a different type of message because it is not an actual ST to ERROR. Let's discuss it more and add implementation in a following PR. -- This is an automated message from the Apache Git Service. To respond to t
Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]
zpinto commented on code in PR #2845: URL: https://github.com/apache/helix/pull/2845#discussion_r1697768612 ## helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java: ## @@ -0,0 +1,246 @@ +package org.apache.helix.gateway.participant; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.helix.HelixDefinedState; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; +import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor; +import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateTransitionError; + +/** + * HelixGatewayParticipant encapsulates the Helix Participant Manager and handles tracking the state + * of a remote participant connected to the Helix Gateway Service. It processes state transitions + * for the participant and updates the state of the participant's shards upon successful state + * transitions signaled by remote participant. + */ +public class HelixGatewayParticipant implements HelixStateTransitionProcessor { + private final HelixGatewayServiceProcessor _gatewayServiceProcessor; + private final HelixManager _participantManager; + private final Map> _shardStateMap; + private final Map> _stateTransitionResultMap; + + private HelixGatewayParticipant(HelixGatewayServiceProcessor gatewayServiceProcessor, + HelixManager participantManager, Map> initialShardStateMap) { +_gatewayServiceProcessor = gatewayServiceProcessor; +_participantManager = participantManager; +_shardStateMap = initialShardStateMap; +_stateTransitionResultMap = new ConcurrentHashMap<>(); + } + + @Override + public void processStateTransitionMessage(Message message) throws Exception { +String transitionId = message.getMsgId(); +String resourceId = message.getResourceName(); +String shardId = message.getPartitionName(); +String toState = message.getToState(); + +try { + if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) { +return; + } + + CompletableFuture future = new CompletableFuture<>(); + _stateTransitionResultMap.put(transitionId, future); + _gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(), + getCurrentState(resourceId, shardId), message); Review Comment: We don't send the from state, we are only sending the target state and the TransitionType so they have no exposure to the from state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org
Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]
xyuanlu commented on code in PR #2845: URL: https://github.com/apache/helix/pull/2845#discussion_r1697706402 ## helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java: ## @@ -0,0 +1,246 @@ +package org.apache.helix.gateway.participant; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.helix.HelixDefinedState; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; +import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor; +import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateTransitionError; + +/** + * HelixGatewayParticipant encapsulates the Helix Participant Manager and handles tracking the state + * of a remote participant connected to the Helix Gateway Service. It processes state transitions + * for the participant and updates the state of the participant's shards upon successful state + * transitions signaled by remote participant. + */ +public class HelixGatewayParticipant implements HelixStateTransitionProcessor { + private final HelixGatewayServiceProcessor _gatewayServiceProcessor; + private final HelixManager _participantManager; + private final Map> _shardStateMap; + private final Map> _stateTransitionResultMap; + + private HelixGatewayParticipant(HelixGatewayServiceProcessor gatewayServiceProcessor, + HelixManager participantManager, Map> initialShardStateMap) { +_gatewayServiceProcessor = gatewayServiceProcessor; +_participantManager = participantManager; +_shardStateMap = initialShardStateMap; +_stateTransitionResultMap = new ConcurrentHashMap<>(); + } + + @Override + public void processStateTransitionMessage(Message message) throws Exception { +String transitionId = message.getMsgId(); +String resourceId = message.getResourceName(); +String shardId = message.getPartitionName(); +String toState = message.getToState(); + +try { + if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) { +return; + } + + CompletableFuture future = new CompletableFuture<>(); + _stateTransitionResultMap.put(transitionId, future); + _gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(), + getCurrentState(resourceId, shardId), message); + + boolean success = future.get(); + if (!success) { +throw new Exception("Failed to transition to state " + toState); + } + + updateState(resourceId, shardId, toState); +} finally { + _stateTransitionResultMap.remove(transitionId); +} + } + + @Override + public void handleStateTransitionError(Message message, StateTransitionError error) { +// Remove the stateTransitionResultMap future for the message +String transitionId = message.getMsgId(); +String resourceId = message.getResourceName(); +String shardId = message.getPartitionName(); + +// Remove the future from the stateTransitionResultMap since we are no longer able +// to process the state transition due to participant manager either timing out +// or failing to process the state transition +_stateTransitionResultMap.remove(transitionId); + +// Set the replica state to ERROR +updateState(resourceId, shardId, HelixDefinedState.ERROR.name()); + +// Notify the HelixGatewayParticipantClient that it is in ERROR state Review Comment: should't we call " _gatewayServiceProcessor.sendStateTransitionMessage" here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: review
Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]
xyuanlu commented on code in PR #2845: URL: https://github.com/apache/helix/pull/2845#discussion_r1697699138 ## helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModel.java: ## @@ -0,0 +1,60 @@ +package org.apache.helix.gateway.statemodel; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.NotificationContext; +import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelInfo; +import org.apache.helix.participant.statemachine.StateTransitionError; +import org.apache.helix.participant.statemachine.Transition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@StateModelInfo(initialState = "OFFLINE", states = {}) +public class HelixGatewayMultiTopStateStateModel extends StateModel { + private static final Logger _logger = + LoggerFactory.getLogger(HelixGatewayMultiTopStateStateModel.class); + + private final HelixStateTransitionProcessor _stateTransitionProcessor; + + public HelixGatewayMultiTopStateStateModel( + HelixStateTransitionProcessor stateTransitionProcessor) { +_stateTransitionProcessor = stateTransitionProcessor; + } + + @Transition(to = "*", from = "*") + public void genericStateTransitionHandler(Message message, NotificationContext context) + throws Exception { +_stateTransitionProcessor.processStateTransitionMessage(message); + } + + @Override + public void reset() { +// no-op we don't want to start from init state again. + } + + @Override + public void rollbackOnError(Message message, NotificationContext context, Review Comment: Please correct me if I am wrong. We are not using this method now in out first version right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org
Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]
xyuanlu commented on code in PR #2845: URL: https://github.com/apache/helix/pull/2845#discussion_r1697699138 ## helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModel.java: ## @@ -0,0 +1,60 @@ +package org.apache.helix.gateway.statemodel; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.NotificationContext; +import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelInfo; +import org.apache.helix.participant.statemachine.StateTransitionError; +import org.apache.helix.participant.statemachine.Transition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@StateModelInfo(initialState = "OFFLINE", states = {}) +public class HelixGatewayMultiTopStateStateModel extends StateModel { + private static final Logger _logger = + LoggerFactory.getLogger(HelixGatewayMultiTopStateStateModel.class); + + private final HelixStateTransitionProcessor _stateTransitionProcessor; + + public HelixGatewayMultiTopStateStateModel( + HelixStateTransitionProcessor stateTransitionProcessor) { +_stateTransitionProcessor = stateTransitionProcessor; + } + + @Transition(to = "*", from = "*") + public void genericStateTransitionHandler(Message message, NotificationContext context) + throws Exception { +_stateTransitionProcessor.processStateTransitionMessage(message); + } + + @Override + public void reset() { +// no-op we don't want to start from init state again. + } + + @Override + public void rollbackOnError(Message message, NotificationContext context, Review Comment: Please correct me if I am wrong. We are not using this method now in out first version right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org
Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]
xyuanlu commented on code in PR #2845: URL: https://github.com/apache/helix/pull/2845#discussion_r1697699138 ## helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModel.java: ## @@ -0,0 +1,60 @@ +package org.apache.helix.gateway.statemodel; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.NotificationContext; +import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelInfo; +import org.apache.helix.participant.statemachine.StateTransitionError; +import org.apache.helix.participant.statemachine.Transition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@StateModelInfo(initialState = "OFFLINE", states = {}) +public class HelixGatewayMultiTopStateStateModel extends StateModel { + private static final Logger _logger = + LoggerFactory.getLogger(HelixGatewayMultiTopStateStateModel.class); + + private final HelixStateTransitionProcessor _stateTransitionProcessor; + + public HelixGatewayMultiTopStateStateModel( + HelixStateTransitionProcessor stateTransitionProcessor) { +_stateTransitionProcessor = stateTransitionProcessor; + } + + @Transition(to = "*", from = "*") + public void genericStateTransitionHandler(Message message, NotificationContext context) + throws Exception { +_stateTransitionProcessor.processStateTransitionMessage(message); + } + + @Override + public void reset() { +// no-op we don't want to start from init state again. + } + + @Override + public void rollbackOnError(Message message, NotificationContext context, Review Comment: Please correct me if I am wrong. We are not invoke this method now in out first version right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org
Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]
xyuanlu commented on code in PR #2845: URL: https://github.com/apache/helix/pull/2845#discussion_r1697698123 ## helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java: ## @@ -0,0 +1,246 @@ +package org.apache.helix.gateway.participant; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.helix.HelixDefinedState; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; +import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor; +import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateTransitionError; + +/** + * HelixGatewayParticipant encapsulates the Helix Participant Manager and handles tracking the state + * of a remote participant connected to the Helix Gateway Service. It processes state transitions + * for the participant and updates the state of the participant's shards upon successful state + * transitions signaled by remote participant. + */ +public class HelixGatewayParticipant implements HelixStateTransitionProcessor { + private final HelixGatewayServiceProcessor _gatewayServiceProcessor; + private final HelixManager _participantManager; + private final Map> _shardStateMap; + private final Map> _stateTransitionResultMap; + + private HelixGatewayParticipant(HelixGatewayServiceProcessor gatewayServiceProcessor, + HelixManager participantManager, Map> initialShardStateMap) { +_gatewayServiceProcessor = gatewayServiceProcessor; +_participantManager = participantManager; +_shardStateMap = initialShardStateMap; +_stateTransitionResultMap = new ConcurrentHashMap<>(); + } + + @Override + public void processStateTransitionMessage(Message message) throws Exception { +String transitionId = message.getMsgId(); +String resourceId = message.getResourceName(); +String shardId = message.getPartitionName(); +String toState = message.getToState(); + +try { + if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) { +return; + } + + CompletableFuture future = new CompletableFuture<>(); + _stateTransitionResultMap.put(transitionId, future); + _gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(), + getCurrentState(resourceId, shardId), message); Review Comment: question. If we had a partiton being assigned to the instance, do we translate the Helix ST message "INIT->ONLINE" to "DROPPED->ONLINE"? That would be wired for user to process. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org
Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]
xyuanlu commented on code in PR #2845: URL: https://github.com/apache/helix/pull/2845#discussion_r1697690752 ## helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java: ## @@ -19,11 +19,22 @@ * under the License. */ +import org.apache.helix.model.Message; + /** - * Translate from/to GRPC function call to Helix Gateway Service event. + * Helix Gateway Service Processor interface allows sending state transition messages to + * participants through service implementing this interface. */ public interface HelixGatewayServiceProcessor { - public boolean sendStateTransitionMessage( String instanceName); + /** + * Send a state transition message to a remote participant. + * + * @param instanceName the name of the participant + * @param currentState the current state of the participant Review Comment: the current state of the shard? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org
Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]
xyuanlu commented on code in PR #2845: URL: https://github.com/apache/helix/pull/2845#discussion_r1697682957 ## helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java: ## @@ -0,0 +1,246 @@ +package org.apache.helix.gateway.participant; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.helix.HelixDefinedState; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; +import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor; +import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateTransitionError; + +/** + * HelixGatewayParticipant encapsulates the Helix Participant Manager and handles tracking the state + * of a remote participant connected to the Helix Gateway Service. It processes state transitions + * for the participant and updates the state of the participant's shards upon successful state + * transitions signaled by remote participant. + */ +public class HelixGatewayParticipant implements HelixStateTransitionProcessor { + private final HelixGatewayServiceProcessor _gatewayServiceProcessor; + private final HelixManager _participantManager; + private final Map> _shardStateMap; + private final Map> _stateTransitionResultMap; + + private HelixGatewayParticipant(HelixGatewayServiceProcessor gatewayServiceProcessor, + HelixManager participantManager, Map> initialShardStateMap) { +_gatewayServiceProcessor = gatewayServiceProcessor; +_participantManager = participantManager; +_shardStateMap = initialShardStateMap; +_stateTransitionResultMap = new ConcurrentHashMap<>(); + } + + @Override + public void processStateTransitionMessage(Message message) throws Exception { +String transitionId = message.getMsgId(); +String resourceId = message.getResourceName(); +String shardId = message.getPartitionName(); +String toState = message.getToState(); + +try { + if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) { +return; + } + + CompletableFuture future = new CompletableFuture<>(); + _stateTransitionResultMap.put(transitionId, future); + _gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(), + getCurrentState(resourceId, shardId), message); + + boolean success = future.get(); + if (!success) { +throw new Exception("Failed to transition to state " + toState); Review Comment: nit: if(future.get()) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org
Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]
xyuanlu commented on code in PR #2845: URL: https://github.com/apache/helix/pull/2845#discussion_r1693698665 ## helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java: ## @@ -89,33 +91,47 @@ public void run() { * It includes waiting for ZK connection, and also wait for previous LiveInstance to expire. */ class participantConnectionProcessor implements Runnable { -GatewayServiceEvent _event; +private final GatewayServiceEvent _event; -public participantConnectionProcessor(GatewayServiceEvent event) { +private participantConnectionProcessor(GatewayServiceEvent event) { _event = event; } @Override public void run() { - HelixGatewayService helixGatewayService; - _helixGatewayServiceMap.computeIfAbsent(_event.getClusterName(), - k -> new HelixGatewayService(GatewayServiceManager.this, _event.getClusterName())); - helixGatewayService = _helixGatewayServiceMap.get(_event.getClusterName()); if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) { -helixGatewayService.registerParticipant(); +createHelixGatewayParticipant(_event.getClusterName(), _event.getInstanceName(), +_event.getShardStateMap()); } else { -helixGatewayService.deregisterParticipant(_event.getClusterName(), _event.getInstanceName()); +removeHelixGatewayParticipant(_event.getClusterName(), _event.getInstanceName()); } } } - @VisibleForTesting - HelixGatewayServiceGrpcService getGrpcService() { -return _grpcService; + private void createHelixGatewayParticipant(String clusterName, String instanceName, + Map> initialShardStateMap) { +// Create and add the participant to the participant map +HelixGatewayParticipant.Builder participantBuilder = +new HelixGatewayParticipant.Builder(_gatewayServiceProcessor, instanceName, clusterName, +_zkAddress).addInitialShardState(initialShardStateMap); +SUPPORTED_MULTI_STATE_MODEL_TYPES.forEach( +participantBuilder::addMultiTopStateStateModelDefinition); +_helixGatewayParticipantMap.computeIfAbsent(clusterName, k -> new ConcurrentHashMap<>()) +.put(instanceName, participantBuilder.build()); + } + + private void removeHelixGatewayParticipant(String clusterName, String instanceName) { +// Disconnect and remove the participant from the participant map +HelixGatewayParticipant participant = getHelixGatewayParticipant(clusterName, instanceName); +if (participant != null) { Review Comment: can we have null check in getter? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org
Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]
xyuanlu commented on code in PR #2845: URL: https://github.com/apache/helix/pull/2845#discussion_r1693481471 ## helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java: ## @@ -0,0 +1,206 @@ +package org.apache.helix.gateway.participant; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.helix.HelixDefinedState; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; +import org.apache.helix.gateway.api.participant.HelixGatewayMultiTopStateStateTransitionProcessor; +import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.constant.MessageType; +import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateTransitionError; + +public class HelixGatewayParticipant implements HelixGatewayMultiTopStateStateTransitionProcessor { Review Comment: nit: doc ## helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java: ## @@ -73,17 +75,20 @@ public void onCompleted() { /** * Send state transition message to the instance. * The instance must already have established a connection to the gateway service. - * @param instanceName - * @return + * @param instanceName the instance name to send the message to + * @param messageType the type of the message + * @param message the message to convert to the transition message */ @Override - public boolean sendStateTransitionMessage(String instanceName) { + public void sendStateTransitionMessage(String instanceName, MessageType messageType, Review Comment: Maybe we do not need to pass the MessageType here. we could read the from and f=to state In translation util. Message type also confuses with org.apache.helix.model.Message.MessageType. ## helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java: ## @@ -0,0 +1,206 @@ +package org.apache.helix.gateway.participant; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.helix.HelixDefinedState; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; +import org.apache.helix.gateway.api.participant.HelixGatewayMultiTopStateStateTransitionProcessor; +import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.constant.MessageType; +import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateTransitionError; + +public class HelixGatewayParticipant implements HelixGatewayMultiTopStateStateTransitionProcessor { + private final HelixGatewayServiceProcessor _gatewayServiceProcessor; + private final HelixManager _participantManager; + private final Map> _shardStateMap; + private final Map> _stateTransitionResultMap; + + private HelixGatewayParticipant(HelixGatewayServiceProcessor gatewayServiceProcessor, + HelixManager participantManager, Map> initialShardStateMap) { +_gatewayServiceProcessor = gatewayServiceProcessor; +_participantManager = participantManager; +_shardStateMap = initialShardStateMap; +_stateTransitionResultMap = new ConcurrentHashMap<>(); + } + + @Override + public void processMultiTopStateModelStateTransitionMessage(Message message) throws Exception { +String transitionId = message.getMsgId(); +String resourceId = message.getResourceName(); +String shardId = message.getPartitionName(); +String toState = message.getToState(); + +try { + if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) { +return; + } + + CompletableFuture future = new CompletableFuture<>(); + _stateTransitionResultMap.put(transitionId, future); + _gatewayServiceProcessor.sendStateTransitionMessage( + _participantManager.getInstanceName(), + determineTransitionType(resourceId, shardId, toState), message); + + boolean success = future.get(); + if (!success) { +throw new Exception("Failed to transition to state " + toState); + } + + updateState(resourceId, shardId, toState); +} finally { + _stateTransitionResultMap.remove(transitionId); +} + } + + @Override + public void handleStateTransitionError(Message message, StateTransitionError error) { +// Remove the stateTransitionResultMap future for the message +String tra