Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]

2024-07-31 Thread via GitHub


xyuanlu merged PR #2845:
URL: https://github.com/apache/helix/pull/2845


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]

2024-07-31 Thread via GitHub


zpinto commented on PR #2845:
URL: https://github.com/apache/helix/pull/2845#issuecomment-2261544116

   This PR is ready to be merged!
   
   Comment:
   -  Add ST handling logic to support multi top state state model definitions 
without intermediary states
   -  Encapsulate the participant manager create, connect, disconnect logic in 
HelixGatewayParticipant
   -  Add replica state tracking in HelixGatewayParticipant


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]

2024-07-31 Thread via GitHub


zpinto commented on code in PR #2845:
URL: https://github.com/apache/helix/pull/2845#discussion_r1699157869


##
helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java:
##
@@ -0,0 +1,245 @@
+package org.apache.helix.gateway.participant;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import 
org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+
+/**
+ * HelixGatewayParticipant encapsulates the Helix Participant Manager and 
handles tracking the state
+ * of a remote participant connected to the Helix Gateway Service. It 
processes state transitions
+ * for the participant and updates the state of the participant's shards upon 
successful state
+ * transitions signaled by remote participant.
+ */
+public class HelixGatewayParticipant implements HelixStateTransitionProcessor {
+  private final HelixGatewayServiceProcessor _gatewayServiceProcessor;
+  private final HelixManager _participantManager;
+  private final Map> _shardStateMap;
+  private final Map> 
_stateTransitionResultMap;
+
+  private HelixGatewayParticipant(HelixGatewayServiceProcessor 
gatewayServiceProcessor,
+  HelixManager participantManager, Map> 
initialShardStateMap) {
+_gatewayServiceProcessor = gatewayServiceProcessor;
+_participantManager = participantManager;
+_shardStateMap = initialShardStateMap;
+_stateTransitionResultMap = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void processStateTransitionMessage(Message message) throws Exception {
+String transitionId = message.getMsgId();
+String resourceId = message.getResourceName();
+String shardId = message.getPartitionName();
+String toState = message.getToState();
+
+try {
+  if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) {
+return;
+  }
+
+  CompletableFuture future = new CompletableFuture<>();
+  _stateTransitionResultMap.put(transitionId, future);
+  
_gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(),
+  getCurrentState(resourceId, shardId), message);
+
+  if (!future.get()) {
+throw new Exception("Failed to transition to state " + toState);
+  }
+
+  updateState(resourceId, shardId, toState);
+} finally {
+  _stateTransitionResultMap.remove(transitionId);
+}
+  }
+
+  @Override
+  public void handleStateTransitionError(Message message, StateTransitionError 
error) {
+// Remove the stateTransitionResultMap future for the message
+String transitionId = message.getMsgId();
+String resourceId = message.getResourceName();
+String shardId = message.getPartitionName();
+
+// Remove the future from the stateTransitionResultMap since we are no 
longer able
+// to process the state transition due to participant manager either 
timing out
+// or failing to process the state transition
+_stateTransitionResultMap.remove(transitionId);
+
+// Set the replica state to ERROR
+updateState(resourceId, shardId, HelixDefinedState.ERROR.name());
+
+// Notify the HelixGatewayParticipantClient that it is in ERROR state
+// TODO: We need a better way than sending the state transition with a 
toState of ERROR
+  }
+
+  /**
+   * Get the instance name of the participant.
+   *
+   * @return participant instance name
+   */
+  public String getInstanceName() {
+return _participantManager.getInstanceName();
+  }
+
+  /**
+   * Completes the st

Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]

2024-07-31 Thread via GitHub


zpinto commented on code in PR #2845:
URL: https://github.com/apache/helix/pull/2845#discussion_r1699157425


##
helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java:
##
@@ -0,0 +1,245 @@
+package org.apache.helix.gateway.participant;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import 
org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+
+/**
+ * HelixGatewayParticipant encapsulates the Helix Participant Manager and 
handles tracking the state
+ * of a remote participant connected to the Helix Gateway Service. It 
processes state transitions
+ * for the participant and updates the state of the participant's shards upon 
successful state
+ * transitions signaled by remote participant.
+ */
+public class HelixGatewayParticipant implements HelixStateTransitionProcessor {
+  private final HelixGatewayServiceProcessor _gatewayServiceProcessor;
+  private final HelixManager _participantManager;
+  private final Map> _shardStateMap;
+  private final Map> 
_stateTransitionResultMap;
+
+  private HelixGatewayParticipant(HelixGatewayServiceProcessor 
gatewayServiceProcessor,
+  HelixManager participantManager, Map> 
initialShardStateMap) {
+_gatewayServiceProcessor = gatewayServiceProcessor;
+_participantManager = participantManager;
+_shardStateMap = initialShardStateMap;
+_stateTransitionResultMap = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void processStateTransitionMessage(Message message) throws Exception {
+String transitionId = message.getMsgId();
+String resourceId = message.getResourceName();
+String shardId = message.getPartitionName();
+String toState = message.getToState();
+
+try {
+  if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) {
+return;
+  }
+
+  CompletableFuture future = new CompletableFuture<>();
+  _stateTransitionResultMap.put(transitionId, future);
+  
_gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(),
+  getCurrentState(resourceId, shardId), message);
+
+  if (!future.get()) {
+throw new Exception("Failed to transition to state " + toState);
+  }
+
+  updateState(resourceId, shardId, toState);
+} finally {
+  _stateTransitionResultMap.remove(transitionId);
+}
+  }
+
+  @Override
+  public void handleStateTransitionError(Message message, StateTransitionError 
error) {
+// Remove the stateTransitionResultMap future for the message
+String transitionId = message.getMsgId();
+String resourceId = message.getResourceName();
+String shardId = message.getPartitionName();
+
+// Remove the future from the stateTransitionResultMap since we are no 
longer able
+// to process the state transition due to participant manager either 
timing out
+// or failing to process the state transition
+_stateTransitionResultMap.remove(transitionId);
+
+// Set the replica state to ERROR
+updateState(resourceId, shardId, HelixDefinedState.ERROR.name());
+
+// Notify the HelixGatewayParticipantClient that it is in ERROR state
+// TODO: We need a better way than sending the state transition with a 
toState of ERROR
+  }
+
+  /**
+   * Get the instance name of the participant.
+   *
+   * @return participant instance name
+   */
+  public String getInstanceName() {
+return _participantManager.getInstanceName();
+  }
+
+  /**
+   * Completes the st

Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]

2024-07-31 Thread via GitHub


xyuanlu commented on PR #2845:
URL: https://github.com/apache/helix/pull/2845#issuecomment-2261448579

   Had an offline discussion with @zpinto. 
   
   LGTM, Please address comments based on our discussion. :D 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]

2024-07-30 Thread via GitHub


zpinto commented on code in PR #2845:
URL: https://github.com/apache/helix/pull/2845#discussion_r1697982618


##
helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java:
##
@@ -92,17 +94,20 @@ public void onCompleted() {
   /**
* Send state transition message to the instance.
* The instance must already have established a connection to the gateway 
service.
-   * @param instanceName
-   * @return
+   *
+   * @param instanceName the instance name to send the message to
+   * @param currentState the current state of shard
+   * @param message the message to convert to the transition message
*/
   @Override
-  public boolean sendStateTransitionMessage(String instanceName) {
+  public void sendStateTransitionMessage(String instanceName, String 
currentState,

Review Comment:
   It is necessary to figure out the TransitionType. It is used to see if it is 
ADD, DELETE, or CHANGE_ROLE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]

2024-07-30 Thread via GitHub


zpinto commented on code in PR #2845:
URL: https://github.com/apache/helix/pull/2845#discussion_r1697981290


##
helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java:
##
@@ -0,0 +1,246 @@
+package org.apache.helix.gateway.participant;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import 
org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+
+/**
+ * HelixGatewayParticipant encapsulates the Helix Participant Manager and 
handles tracking the state
+ * of a remote participant connected to the Helix Gateway Service. It 
processes state transitions
+ * for the participant and updates the state of the participant's shards upon 
successful state
+ * transitions signaled by remote participant.
+ */
+public class HelixGatewayParticipant implements HelixStateTransitionProcessor {
+  private final HelixGatewayServiceProcessor _gatewayServiceProcessor;
+  private final HelixManager _participantManager;
+  private final Map> _shardStateMap;
+  private final Map> 
_stateTransitionResultMap;
+
+  private HelixGatewayParticipant(HelixGatewayServiceProcessor 
gatewayServiceProcessor,
+  HelixManager participantManager, Map> 
initialShardStateMap) {
+_gatewayServiceProcessor = gatewayServiceProcessor;
+_participantManager = participantManager;
+_shardStateMap = initialShardStateMap;
+_stateTransitionResultMap = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void processStateTransitionMessage(Message message) throws Exception {
+String transitionId = message.getMsgId();
+String resourceId = message.getResourceName();
+String shardId = message.getPartitionName();
+String toState = message.getToState();
+
+try {
+  if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) {
+return;
+  }
+
+  CompletableFuture future = new CompletableFuture<>();
+  _stateTransitionResultMap.put(transitionId, future);
+  
_gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(),
+  getCurrentState(resourceId, shardId), message);

Review Comment:
   It is necessary to figure out the TransitionType. It is used to see if it is 
ADD, DELETE, or CHANGE_ROLE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]

2024-07-30 Thread via GitHub


xyuanlu commented on code in PR #2845:
URL: https://github.com/apache/helix/pull/2845#discussion_r1697859886


##
helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java:
##
@@ -92,17 +94,20 @@ public void onCompleted() {
   /**
* Send state transition message to the instance.
* The instance must already have established a connection to the gateway 
service.
-   * @param instanceName
-   * @return
+   *
+   * @param instanceName the instance name to send the message to
+   * @param currentState the current state of shard
+   * @param message the message to convert to the transition message
*/
   @Override
-  public boolean sendStateTransitionMessage(String instanceName) {
+  public void sendStateTransitionMessage(String instanceName, String 
currentState,

Review Comment:
   Why we need currentState here? 



##
helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java:
##
@@ -0,0 +1,245 @@
+package org.apache.helix.gateway.participant;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import 
org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+
+/**
+ * HelixGatewayParticipant encapsulates the Helix Participant Manager and 
handles tracking the state
+ * of a remote participant connected to the Helix Gateway Service. It 
processes state transitions
+ * for the participant and updates the state of the participant's shards upon 
successful state
+ * transitions signaled by remote participant.
+ */
+public class HelixGatewayParticipant implements HelixStateTransitionProcessor {
+  private final HelixGatewayServiceProcessor _gatewayServiceProcessor;
+  private final HelixManager _participantManager;
+  private final Map> _shardStateMap;
+  private final Map> 
_stateTransitionResultMap;
+
+  private HelixGatewayParticipant(HelixGatewayServiceProcessor 
gatewayServiceProcessor,
+  HelixManager participantManager, Map> 
initialShardStateMap) {
+_gatewayServiceProcessor = gatewayServiceProcessor;
+_participantManager = participantManager;
+_shardStateMap = initialShardStateMap;
+_stateTransitionResultMap = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void processStateTransitionMessage(Message message) throws Exception {
+String transitionId = message.getMsgId();
+String resourceId = message.getResourceName();
+String shardId = message.getPartitionName();
+String toState = message.getToState();
+
+try {
+  if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) {
+return;
+  }
+
+  CompletableFuture future = new CompletableFuture<>();
+  _stateTransitionResultMap.put(transitionId, future);
+  
_gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(),
+  getCurrentState(resourceId, shardId), message);
+
+  if (!future.get()) {
+throw new Exception("Failed to transition to state " + toState);
+  }
+
+  updateState(resourceId, shardId, toState);
+} finally {
+  _stateTransitionResultMap.remove(transitionId);
+}
+  }
+
+  @Override
+  public void handleStateTransitionError(Message message, StateTransitionError 
error) {
+// Remove the stateTransitionResultMap future for the message
+String transitionId = message.getMsgId();
+String resourceId = message.getResourceName();
+String shardId = message.getPartitionName();
+
+// Remove th

Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]

2024-07-30 Thread via GitHub


zpinto commented on code in PR #2845:
URL: https://github.com/apache/helix/pull/2845#discussion_r1697790847


##
helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java:
##
@@ -89,33 +91,47 @@ public void run() {
* It includes waiting for ZK connection, and also wait for previous 
LiveInstance to expire.
*/
   class participantConnectionProcessor implements Runnable {
-GatewayServiceEvent _event;
+private final GatewayServiceEvent _event;
 
-public participantConnectionProcessor(GatewayServiceEvent event) {
+private participantConnectionProcessor(GatewayServiceEvent event) {
   _event = event;
 }
 
 @Override
 public void run() {
-  HelixGatewayService helixGatewayService;
-  _helixGatewayServiceMap.computeIfAbsent(_event.getClusterName(),
-  k -> new HelixGatewayService(GatewayServiceManager.this, 
_event.getClusterName()));
-  helixGatewayService = 
_helixGatewayServiceMap.get(_event.getClusterName());
   if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) {
-helixGatewayService.registerParticipant();
+createHelixGatewayParticipant(_event.getClusterName(), 
_event.getInstanceName(),
+_event.getShardStateMap());
   } else {
-helixGatewayService.deregisterParticipant(_event.getClusterName(), 
_event.getInstanceName());
+removeHelixGatewayParticipant(_event.getClusterName(), 
_event.getInstanceName());
   }
 }
   }
 
-  @VisibleForTesting
-  HelixGatewayServiceGrpcService getGrpcService() {
-return _grpcService;
+  private void createHelixGatewayParticipant(String clusterName, String 
instanceName,
+  Map> initialShardStateMap) {
+// Create and add the participant to the participant map
+HelixGatewayParticipant.Builder participantBuilder =
+new HelixGatewayParticipant.Builder(_gatewayServiceProcessor, 
instanceName, clusterName,
+_zkAddress).addInitialShardState(initialShardStateMap);
+SUPPORTED_MULTI_STATE_MODEL_TYPES.forEach(
+participantBuilder::addMultiTopStateStateModelDefinition);
+_helixGatewayParticipantMap.computeIfAbsent(clusterName, k -> new 
ConcurrentHashMap<>())
+.put(instanceName, participantBuilder.build());
+  }
+
+  private void removeHelixGatewayParticipant(String clusterName, String 
instanceName) {
+// Disconnect and remove the participant from the participant map
+HelixGatewayParticipant participant = 
getHelixGatewayParticipant(clusterName, instanceName);
+if (participant != null) {

Review Comment:
   I think we need the getter to return null, otherwise what will it return if 
null check is there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]

2024-07-30 Thread via GitHub


zpinto commented on code in PR #2845:
URL: https://github.com/apache/helix/pull/2845#discussion_r1697769176


##
helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java:
##
@@ -0,0 +1,246 @@
+package org.apache.helix.gateway.participant;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import 
org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+
+/**
+ * HelixGatewayParticipant encapsulates the Helix Participant Manager and 
handles tracking the state
+ * of a remote participant connected to the Helix Gateway Service. It 
processes state transitions
+ * for the participant and updates the state of the participant's shards upon 
successful state
+ * transitions signaled by remote participant.
+ */
+public class HelixGatewayParticipant implements HelixStateTransitionProcessor {
+  private final HelixGatewayServiceProcessor _gatewayServiceProcessor;
+  private final HelixManager _participantManager;
+  private final Map> _shardStateMap;
+  private final Map> 
_stateTransitionResultMap;
+
+  private HelixGatewayParticipant(HelixGatewayServiceProcessor 
gatewayServiceProcessor,
+  HelixManager participantManager, Map> 
initialShardStateMap) {
+_gatewayServiceProcessor = gatewayServiceProcessor;
+_participantManager = participantManager;
+_shardStateMap = initialShardStateMap;
+_stateTransitionResultMap = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void processStateTransitionMessage(Message message) throws Exception {
+String transitionId = message.getMsgId();
+String resourceId = message.getResourceName();
+String shardId = message.getPartitionName();
+String toState = message.getToState();
+
+try {
+  if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) {
+return;
+  }
+
+  CompletableFuture future = new CompletableFuture<>();
+  _stateTransitionResultMap.put(transitionId, future);
+  
_gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(),
+  getCurrentState(resourceId, shardId), message);
+
+  boolean success = future.get();
+  if (!success) {
+throw new Exception("Failed to transition to state " + toState);
+  }
+
+  updateState(resourceId, shardId, toState);
+} finally {
+  _stateTransitionResultMap.remove(transitionId);
+}
+  }
+
+  @Override
+  public void handleStateTransitionError(Message message, StateTransitionError 
error) {
+// Remove the stateTransitionResultMap future for the message
+String transitionId = message.getMsgId();
+String resourceId = message.getResourceName();
+String shardId = message.getPartitionName();
+
+// Remove the future from the stateTransitionResultMap since we are no 
longer able
+// to process the state transition due to participant manager either 
timing out
+// or failing to process the state transition
+_stateTransitionResultMap.remove(transitionId);
+
+// Set the replica state to ERROR
+updateState(resourceId, shardId, HelixDefinedState.ERROR.name());
+
+// Notify the HelixGatewayParticipantClient that it is in ERROR state

Review Comment:
   I think we should hold off on this for now. We may want to have a different 
type of message because it is not an actual ST to ERROR. Let's discuss it more 
and add implementation in a following PR.



-- 
This is an automated message from the Apache Git Service.
To respond to t

Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]

2024-07-30 Thread via GitHub


zpinto commented on code in PR #2845:
URL: https://github.com/apache/helix/pull/2845#discussion_r1697768612


##
helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java:
##
@@ -0,0 +1,246 @@
+package org.apache.helix.gateway.participant;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import 
org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+
+/**
+ * HelixGatewayParticipant encapsulates the Helix Participant Manager and 
handles tracking the state
+ * of a remote participant connected to the Helix Gateway Service. It 
processes state transitions
+ * for the participant and updates the state of the participant's shards upon 
successful state
+ * transitions signaled by remote participant.
+ */
+public class HelixGatewayParticipant implements HelixStateTransitionProcessor {
+  private final HelixGatewayServiceProcessor _gatewayServiceProcessor;
+  private final HelixManager _participantManager;
+  private final Map> _shardStateMap;
+  private final Map> 
_stateTransitionResultMap;
+
+  private HelixGatewayParticipant(HelixGatewayServiceProcessor 
gatewayServiceProcessor,
+  HelixManager participantManager, Map> 
initialShardStateMap) {
+_gatewayServiceProcessor = gatewayServiceProcessor;
+_participantManager = participantManager;
+_shardStateMap = initialShardStateMap;
+_stateTransitionResultMap = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void processStateTransitionMessage(Message message) throws Exception {
+String transitionId = message.getMsgId();
+String resourceId = message.getResourceName();
+String shardId = message.getPartitionName();
+String toState = message.getToState();
+
+try {
+  if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) {
+return;
+  }
+
+  CompletableFuture future = new CompletableFuture<>();
+  _stateTransitionResultMap.put(transitionId, future);
+  
_gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(),
+  getCurrentState(resourceId, shardId), message);

Review Comment:
   We don't send the from state, we are only sending the target state and the 
TransitionType so they have no exposure to the from state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]

2024-07-30 Thread via GitHub


xyuanlu commented on code in PR #2845:
URL: https://github.com/apache/helix/pull/2845#discussion_r1697706402


##
helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java:
##
@@ -0,0 +1,246 @@
+package org.apache.helix.gateway.participant;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import 
org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+
+/**
+ * HelixGatewayParticipant encapsulates the Helix Participant Manager and 
handles tracking the state
+ * of a remote participant connected to the Helix Gateway Service. It 
processes state transitions
+ * for the participant and updates the state of the participant's shards upon 
successful state
+ * transitions signaled by remote participant.
+ */
+public class HelixGatewayParticipant implements HelixStateTransitionProcessor {
+  private final HelixGatewayServiceProcessor _gatewayServiceProcessor;
+  private final HelixManager _participantManager;
+  private final Map> _shardStateMap;
+  private final Map> 
_stateTransitionResultMap;
+
+  private HelixGatewayParticipant(HelixGatewayServiceProcessor 
gatewayServiceProcessor,
+  HelixManager participantManager, Map> 
initialShardStateMap) {
+_gatewayServiceProcessor = gatewayServiceProcessor;
+_participantManager = participantManager;
+_shardStateMap = initialShardStateMap;
+_stateTransitionResultMap = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void processStateTransitionMessage(Message message) throws Exception {
+String transitionId = message.getMsgId();
+String resourceId = message.getResourceName();
+String shardId = message.getPartitionName();
+String toState = message.getToState();
+
+try {
+  if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) {
+return;
+  }
+
+  CompletableFuture future = new CompletableFuture<>();
+  _stateTransitionResultMap.put(transitionId, future);
+  
_gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(),
+  getCurrentState(resourceId, shardId), message);
+
+  boolean success = future.get();
+  if (!success) {
+throw new Exception("Failed to transition to state " + toState);
+  }
+
+  updateState(resourceId, shardId, toState);
+} finally {
+  _stateTransitionResultMap.remove(transitionId);
+}
+  }
+
+  @Override
+  public void handleStateTransitionError(Message message, StateTransitionError 
error) {
+// Remove the stateTransitionResultMap future for the message
+String transitionId = message.getMsgId();
+String resourceId = message.getResourceName();
+String shardId = message.getPartitionName();
+
+// Remove the future from the stateTransitionResultMap since we are no 
longer able
+// to process the state transition due to participant manager either 
timing out
+// or failing to process the state transition
+_stateTransitionResultMap.remove(transitionId);
+
+// Set the replica state to ERROR
+updateState(resourceId, shardId, HelixDefinedState.ERROR.name());
+
+// Notify the HelixGatewayParticipantClient that it is in ERROR state

Review Comment:
   should't we call " _gatewayServiceProcessor.sendStateTransitionMessage" here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: review

Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]

2024-07-30 Thread via GitHub


xyuanlu commented on code in PR #2845:
URL: https://github.com/apache/helix/pull/2845#discussion_r1697699138


##
helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModel.java:
##
@@ -0,0 +1,60 @@
+package org.apache.helix.gateway.statemodel;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+import org.apache.helix.participant.statemachine.Transition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@StateModelInfo(initialState = "OFFLINE", states = {})
+public class HelixGatewayMultiTopStateStateModel extends StateModel {
+  private static final Logger _logger =
+  LoggerFactory.getLogger(HelixGatewayMultiTopStateStateModel.class);
+
+  private final HelixStateTransitionProcessor _stateTransitionProcessor;
+
+  public HelixGatewayMultiTopStateStateModel(
+  HelixStateTransitionProcessor stateTransitionProcessor) {
+_stateTransitionProcessor = stateTransitionProcessor;
+  }
+
+  @Transition(to = "*", from = "*")
+  public void genericStateTransitionHandler(Message message, 
NotificationContext context)
+  throws Exception {
+_stateTransitionProcessor.processStateTransitionMessage(message);
+  }
+
+  @Override
+  public void reset() {
+// no-op we don't want to start from init state again.
+  }
+
+  @Override
+  public void rollbackOnError(Message message, NotificationContext context,

Review Comment:
   Please correct me if I am wrong. We are not using this method now in out 
first version right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]

2024-07-30 Thread via GitHub


xyuanlu commented on code in PR #2845:
URL: https://github.com/apache/helix/pull/2845#discussion_r1697699138


##
helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModel.java:
##
@@ -0,0 +1,60 @@
+package org.apache.helix.gateway.statemodel;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+import org.apache.helix.participant.statemachine.Transition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@StateModelInfo(initialState = "OFFLINE", states = {})
+public class HelixGatewayMultiTopStateStateModel extends StateModel {
+  private static final Logger _logger =
+  LoggerFactory.getLogger(HelixGatewayMultiTopStateStateModel.class);
+
+  private final HelixStateTransitionProcessor _stateTransitionProcessor;
+
+  public HelixGatewayMultiTopStateStateModel(
+  HelixStateTransitionProcessor stateTransitionProcessor) {
+_stateTransitionProcessor = stateTransitionProcessor;
+  }
+
+  @Transition(to = "*", from = "*")
+  public void genericStateTransitionHandler(Message message, 
NotificationContext context)
+  throws Exception {
+_stateTransitionProcessor.processStateTransitionMessage(message);
+  }
+
+  @Override
+  public void reset() {
+// no-op we don't want to start from init state again.
+  }
+
+  @Override
+  public void rollbackOnError(Message message, NotificationContext context,

Review Comment:
   Please correct me if I am wrong. We are not using this method now in out 
first version right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]

2024-07-30 Thread via GitHub


xyuanlu commented on code in PR #2845:
URL: https://github.com/apache/helix/pull/2845#discussion_r1697699138


##
helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModel.java:
##
@@ -0,0 +1,60 @@
+package org.apache.helix.gateway.statemodel;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+import org.apache.helix.participant.statemachine.Transition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@StateModelInfo(initialState = "OFFLINE", states = {})
+public class HelixGatewayMultiTopStateStateModel extends StateModel {
+  private static final Logger _logger =
+  LoggerFactory.getLogger(HelixGatewayMultiTopStateStateModel.class);
+
+  private final HelixStateTransitionProcessor _stateTransitionProcessor;
+
+  public HelixGatewayMultiTopStateStateModel(
+  HelixStateTransitionProcessor stateTransitionProcessor) {
+_stateTransitionProcessor = stateTransitionProcessor;
+  }
+
+  @Transition(to = "*", from = "*")
+  public void genericStateTransitionHandler(Message message, 
NotificationContext context)
+  throws Exception {
+_stateTransitionProcessor.processStateTransitionMessage(message);
+  }
+
+  @Override
+  public void reset() {
+// no-op we don't want to start from init state again.
+  }
+
+  @Override
+  public void rollbackOnError(Message message, NotificationContext context,

Review Comment:
   Please correct me if I am wrong. We are not invoke this method now in out 
first version right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]

2024-07-30 Thread via GitHub


xyuanlu commented on code in PR #2845:
URL: https://github.com/apache/helix/pull/2845#discussion_r1697698123


##
helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java:
##
@@ -0,0 +1,246 @@
+package org.apache.helix.gateway.participant;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import 
org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+
+/**
+ * HelixGatewayParticipant encapsulates the Helix Participant Manager and 
handles tracking the state
+ * of a remote participant connected to the Helix Gateway Service. It 
processes state transitions
+ * for the participant and updates the state of the participant's shards upon 
successful state
+ * transitions signaled by remote participant.
+ */
+public class HelixGatewayParticipant implements HelixStateTransitionProcessor {
+  private final HelixGatewayServiceProcessor _gatewayServiceProcessor;
+  private final HelixManager _participantManager;
+  private final Map> _shardStateMap;
+  private final Map> 
_stateTransitionResultMap;
+
+  private HelixGatewayParticipant(HelixGatewayServiceProcessor 
gatewayServiceProcessor,
+  HelixManager participantManager, Map> 
initialShardStateMap) {
+_gatewayServiceProcessor = gatewayServiceProcessor;
+_participantManager = participantManager;
+_shardStateMap = initialShardStateMap;
+_stateTransitionResultMap = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void processStateTransitionMessage(Message message) throws Exception {
+String transitionId = message.getMsgId();
+String resourceId = message.getResourceName();
+String shardId = message.getPartitionName();
+String toState = message.getToState();
+
+try {
+  if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) {
+return;
+  }
+
+  CompletableFuture future = new CompletableFuture<>();
+  _stateTransitionResultMap.put(transitionId, future);
+  
_gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(),
+  getCurrentState(resourceId, shardId), message);

Review Comment:
   question. If we had a partiton being assigned to the instance, do we 
translate the Helix ST message "INIT->ONLINE" to "DROPPED->ONLINE"? That would 
be wired for user to process. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]

2024-07-30 Thread via GitHub


xyuanlu commented on code in PR #2845:
URL: https://github.com/apache/helix/pull/2845#discussion_r1697690752


##
helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java:
##
@@ -19,11 +19,22 @@
  * under the License.
  */
 
+import org.apache.helix.model.Message;
+
 /**
- * Translate from/to GRPC function call to Helix Gateway Service event.
+ * Helix Gateway Service Processor interface allows sending state transition 
messages to
+ * participants through service implementing this interface.
  */
 public interface HelixGatewayServiceProcessor {
 
-  public boolean sendStateTransitionMessage( String instanceName);
+  /**
+   * Send a state transition message to a remote participant.
+   *
+   * @param instanceName the name of the participant
+   * @param currentState the current state of the participant

Review Comment:
   the current state of the shard?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]

2024-07-30 Thread via GitHub


xyuanlu commented on code in PR #2845:
URL: https://github.com/apache/helix/pull/2845#discussion_r1697682957


##
helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java:
##
@@ -0,0 +1,246 @@
+package org.apache.helix.gateway.participant;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.gateway.api.participant.HelixStateTransitionProcessor;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import 
org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+
+/**
+ * HelixGatewayParticipant encapsulates the Helix Participant Manager and 
handles tracking the state
+ * of a remote participant connected to the Helix Gateway Service. It 
processes state transitions
+ * for the participant and updates the state of the participant's shards upon 
successful state
+ * transitions signaled by remote participant.
+ */
+public class HelixGatewayParticipant implements HelixStateTransitionProcessor {
+  private final HelixGatewayServiceProcessor _gatewayServiceProcessor;
+  private final HelixManager _participantManager;
+  private final Map> _shardStateMap;
+  private final Map> 
_stateTransitionResultMap;
+
+  private HelixGatewayParticipant(HelixGatewayServiceProcessor 
gatewayServiceProcessor,
+  HelixManager participantManager, Map> 
initialShardStateMap) {
+_gatewayServiceProcessor = gatewayServiceProcessor;
+_participantManager = participantManager;
+_shardStateMap = initialShardStateMap;
+_stateTransitionResultMap = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void processStateTransitionMessage(Message message) throws Exception {
+String transitionId = message.getMsgId();
+String resourceId = message.getResourceName();
+String shardId = message.getPartitionName();
+String toState = message.getToState();
+
+try {
+  if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) {
+return;
+  }
+
+  CompletableFuture future = new CompletableFuture<>();
+  _stateTransitionResultMap.put(transitionId, future);
+  
_gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(),
+  getCurrentState(resourceId, shardId), message);
+
+  boolean success = future.get();
+  if (!success) {
+throw new Exception("Failed to transition to state " + toState);

Review Comment:
   nit: if(future.get())



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]

2024-07-26 Thread via GitHub


xyuanlu commented on code in PR #2845:
URL: https://github.com/apache/helix/pull/2845#discussion_r1693698665


##
helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java:
##
@@ -89,33 +91,47 @@ public void run() {
* It includes waiting for ZK connection, and also wait for previous 
LiveInstance to expire.
*/
   class participantConnectionProcessor implements Runnable {
-GatewayServiceEvent _event;
+private final GatewayServiceEvent _event;
 
-public participantConnectionProcessor(GatewayServiceEvent event) {
+private participantConnectionProcessor(GatewayServiceEvent event) {
   _event = event;
 }
 
 @Override
 public void run() {
-  HelixGatewayService helixGatewayService;
-  _helixGatewayServiceMap.computeIfAbsent(_event.getClusterName(),
-  k -> new HelixGatewayService(GatewayServiceManager.this, 
_event.getClusterName()));
-  helixGatewayService = 
_helixGatewayServiceMap.get(_event.getClusterName());
   if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) {
-helixGatewayService.registerParticipant();
+createHelixGatewayParticipant(_event.getClusterName(), 
_event.getInstanceName(),
+_event.getShardStateMap());
   } else {
-helixGatewayService.deregisterParticipant(_event.getClusterName(), 
_event.getInstanceName());
+removeHelixGatewayParticipant(_event.getClusterName(), 
_event.getInstanceName());
   }
 }
   }
 
-  @VisibleForTesting
-  HelixGatewayServiceGrpcService getGrpcService() {
-return _grpcService;
+  private void createHelixGatewayParticipant(String clusterName, String 
instanceName,
+  Map> initialShardStateMap) {
+// Create and add the participant to the participant map
+HelixGatewayParticipant.Builder participantBuilder =
+new HelixGatewayParticipant.Builder(_gatewayServiceProcessor, 
instanceName, clusterName,
+_zkAddress).addInitialShardState(initialShardStateMap);
+SUPPORTED_MULTI_STATE_MODEL_TYPES.forEach(
+participantBuilder::addMultiTopStateStateModelDefinition);
+_helixGatewayParticipantMap.computeIfAbsent(clusterName, k -> new 
ConcurrentHashMap<>())
+.put(instanceName, participantBuilder.build());
+  }
+
+  private void removeHelixGatewayParticipant(String clusterName, String 
instanceName) {
+// Disconnect and remove the participant from the participant map
+HelixGatewayParticipant participant = 
getHelixGatewayParticipant(clusterName, instanceName);
+if (participant != null) {

Review Comment:
   can we have null check in getter? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement Helix ST handling logic and HelixGatewayParticipant [helix]

2024-07-26 Thread via GitHub


xyuanlu commented on code in PR #2845:
URL: https://github.com/apache/helix/pull/2845#discussion_r1693481471


##
helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java:
##
@@ -0,0 +1,206 @@
+package org.apache.helix.gateway.participant;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import 
org.apache.helix.gateway.api.participant.HelixGatewayMultiTopStateStateTransitionProcessor;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.constant.MessageType;
+import 
org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+
+public class HelixGatewayParticipant implements 
HelixGatewayMultiTopStateStateTransitionProcessor {

Review Comment:
   nit: doc



##
helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java:
##
@@ -73,17 +75,20 @@ public void onCompleted() {
   /**
* Send state transition message to the instance.
* The instance must already have established a connection to the gateway 
service.
-   * @param instanceName
-   * @return
+   * @param instanceName the instance name to send the message to
+   * @param messageType the type of the message
+   * @param message the message to convert to the transition message
*/
   @Override
-  public boolean sendStateTransitionMessage(String instanceName) {
+  public void sendStateTransitionMessage(String instanceName, MessageType 
messageType,

Review Comment:
   Maybe we do not need to pass the MessageType here. we could read the from 
and f=to state In translation util. Message type also confuses with 
org.apache.helix.model.Message.MessageType. 



##
helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java:
##
@@ -0,0 +1,206 @@
+package org.apache.helix.gateway.participant;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import 
org.apache.helix.gateway.api.participant.HelixGatewayMultiTopStateStateTransitionProcessor;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.constant.MessageType;
+import 
org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+
+public class HelixGatewayParticipant implements 
HelixGatewayMultiTopStateStateTransitionProcessor {
+  private final HelixGatewayServiceProcessor _gatewayServiceProcessor;
+  private final HelixManager _participantManager;
+  private final Map> _shardStateMap;
+  private final Map> 
_stateTransitionResultMap;
+
+  private HelixGatewayParticipant(HelixGatewayServiceProcessor 
gatewayServiceProcessor,
+  HelixManager participantManager, Map> 
initialShardStateMap) {
+_gatewayServiceProcessor = gatewayServiceProcessor;
+_participantManager = participantManager;
+_shardStateMap = initialShardStateMap;
+_stateTransitionResultMap = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void processMultiTopStateModelStateTransitionMessage(Message message) 
throws Exception {
+String transitionId = message.getMsgId();
+String resourceId = message.getResourceName();
+String shardId = message.getPartitionName();
+String toState = message.getToState();
+
+try {
+  if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) {
+return;
+  }
+
+  CompletableFuture future = new CompletableFuture<>();
+  _stateTransitionResultMap.put(transitionId, future);
+  _gatewayServiceProcessor.sendStateTransitionMessage(
+  _participantManager.getInstanceName(),
+  determineTransitionType(resourceId, shardId, toState), message);
+
+  boolean success = future.get();
+  if (!success) {
+throw new Exception("Failed to transition to state " + toState);
+  }
+
+  updateState(resourceId, shardId, toState);
+} finally {
+  _stateTransitionResultMap.remove(transitionId);
+}
+  }
+
+  @Override
+  public void handleStateTransitionError(Message message, StateTransitionError 
error) {
+// Remove the stateTransitionResultMap future for the message
+String tra