Re: [PR] Implement GatewayServiceManager [helix]

2024-07-25 Thread via GitHub


xyuanlu commented on code in PR #2844:
URL: https://github.com/apache/helix/pull/2844#discussion_r1692188521


##
helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java:
##
@@ -0,0 +1,118 @@
+package org.apache.helix.gateway.grpcservice;
+
+import io.grpc.stub.StreamObserver;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.util.PerKeyLockRegistry;

Review Comment:
   yep. 
https://github.com/apache/helix/tree/helix-gateway-service/helix-gateway/src/main/java/org/apache/helix/gateway/util



##
helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java:
##
@@ -1,17 +1,70 @@
 package org.apache.helix.gateway.util;
 
-import org.apache.helix.gateway.service.GatewayServiceManager;
-import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState;
 import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;
+import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardTransitionStatus;
+import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
 
 
 public final class StateTransitionMessageTranslateUtil {
 
-  public static TransitionMessage translateSTMsgToProto() {
+  public static TransitionMessage translateSTMsgToTransitionMessage() {
 return null;
   }
 
-  public static GatewayServiceManager.GateWayServiceEvent 
translateProtoToSTMsg(ShardStateMessage message) {
-return null;
+  /**
+   * Translate from user sent ShardStateMessage message to Helix Gateway 
Service event.
+   */
+  public static GatewayServiceEvent 
translateShardStateMessageToEvent(ShardStateMessage request) {
+
+GatewayServiceEvent.GateWayServiceEventBuilder builder;
+if (request.hasShardState()) { // init connection to gateway service
+  ShardState shardState = request.getShardState();
+  Map shardStateMap = new HashMap<>();
+  for (HelixGatewayServiceOuterClass.SingleResourceState resourceState : 
shardState.getResourceStateList()) {
+for (HelixGatewayServiceOuterClass.SingleShardState state : 
resourceState.getShardStatesList()) {
+  shardStateMap.put(resourceState.getResource() + "_" + 
state.getShardName(), state.getCurrentState());
+}
+  }
+  builder = new 
GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.CONNECT).setClusterName(
+  
shardState.getClusterName()).setParticipantName(shardState.getInstanceName());
+} else {
+  ShardTransitionStatus shardTransitionStatus = 
request.getShardTransitionStatus();
+  // this is status update for established connection
+  List status =
+  shardTransitionStatus.getShardTransitionStatusList();
+  List stResult = new 
ArrayList<>();
+  for (HelixGatewayServiceOuterClass.SingleShardTransitionStatus 
shardTransition : status) {
+String stateTransitionId = shardTransition.getTransitionID();
+String stateTransitionStatus = shardTransition.getCurrentState();
+String shardState = shardTransition.getCurrentState();

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement GatewayServiceManager [helix]

2024-07-25 Thread via GitHub


xyuanlu commented on PR #2844:
URL: https://github.com/apache/helix/pull/2844#issuecomment-2251468466

   This change is ready to be merged. Approved by @junkaixue 
   commit message: Implement GatewayServiceManager. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement GatewayServiceManager [helix]

2024-07-25 Thread via GitHub


xyuanlu merged PR #2844:
URL: https://github.com/apache/helix/pull/2844


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement GatewayServiceManager [helix]

2024-07-25 Thread via GitHub


junkaixue commented on code in PR #2844:
URL: https://github.com/apache/helix/pull/2844#discussion_r1692052245


##
helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java:
##
@@ -1,17 +1,70 @@
 package org.apache.helix.gateway.util;
 
-import org.apache.helix.gateway.service.GatewayServiceManager;
-import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState;
 import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;
+import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardTransitionStatus;
+import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
 
 
 public final class StateTransitionMessageTranslateUtil {
 
-  public static TransitionMessage translateSTMsgToProto() {
+  public static TransitionMessage translateSTMsgToTransitionMessage() {
 return null;
   }
 
-  public static GatewayServiceManager.GateWayServiceEvent 
translateProtoToSTMsg(ShardStateMessage message) {
-return null;
+  /**
+   * Translate from user sent ShardStateMessage message to Helix Gateway 
Service event.
+   */
+  public static GatewayServiceEvent 
translateShardStateMessageToEvent(ShardStateMessage request) {
+
+GatewayServiceEvent.GateWayServiceEventBuilder builder;
+if (request.hasShardState()) { // init connection to gateway service
+  ShardState shardState = request.getShardState();
+  Map shardStateMap = new HashMap<>();
+  for (HelixGatewayServiceOuterClass.SingleResourceState resourceState : 
shardState.getResourceStateList()) {
+for (HelixGatewayServiceOuterClass.SingleShardState state : 
resourceState.getShardStatesList()) {
+  shardStateMap.put(resourceState.getResource() + "_" + 
state.getShardName(), state.getCurrentState());
+}
+  }
+  builder = new 
GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.CONNECT).setClusterName(
+  
shardState.getClusterName()).setParticipantName(shardState.getInstanceName());
+} else {
+  ShardTransitionStatus shardTransitionStatus = 
request.getShardTransitionStatus();
+  // this is status update for established connection
+  List status =
+  shardTransitionStatus.getShardTransitionStatusList();
+  List stResult = new 
ArrayList<>();
+  for (HelixGatewayServiceOuterClass.SingleShardTransitionStatus 
shardTransition : status) {
+String stateTransitionId = shardTransition.getTransitionID();
+String stateTransitionStatus = shardTransition.getCurrentState();
+String shardState = shardTransition.getCurrentState();

Review Comment:
   As I said, no creation for these variables when it only used once...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement GatewayServiceManager [helix]

2024-07-25 Thread via GitHub


junkaixue commented on code in PR #2844:
URL: https://github.com/apache/helix/pull/2844#discussion_r1692049083


##
helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java:
##
@@ -0,0 +1,118 @@
+package org.apache.helix.gateway.grpcservice;
+
+import io.grpc.stub.StreamObserver;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.util.PerKeyLockRegistry;

Review Comment:
   Is this implemented?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement GatewayServiceManager [helix]

2024-07-24 Thread via GitHub


xyuanlu commented on code in PR #2844:
URL: https://github.com/apache/helix/pull/2844#discussion_r1690626665


##
helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java:
##
@@ -1,17 +1,71 @@
 package org.apache.helix.gateway.util;
 
-import org.apache.helix.gateway.service.GatewayServiceManager;
-import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState;
 import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;
+import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardTransitionStatus;
+import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
 
 
 public final class StateTransitionMessageTranslateUtil {
 
-  public static TransitionMessage translateSTMsgToProto() {
+  public static TransitionMessage translateSTMsgToTransitionMessage() {
 return null;
   }
 
-  public static GatewayServiceManager.GateWayServiceEvent 
translateProtoToSTMsg(ShardStateMessage message) {
-return null;
+  /**
+   * Translate from GRPC function call to Helix Gateway Service event.
+   */
+  public static GatewayServiceEvent 
translateShardStateMessageToEvent(ShardStateMessage request) {
+
+if (request.hasShardState()) { // init connection to gateway service
+  ShardState shardState = request.getShardState();
+  String instanceName = shardState.getInstanceName();
+  String clusterName = shardState.getClusterName();

Review Comment:
   TFTR. Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement GatewayServiceManager [helix]

2024-07-24 Thread via GitHub


junkaixue commented on code in PR #2844:
URL: https://github.com/apache/helix/pull/2844#discussion_r1690590639


##
helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java:
##
@@ -1,17 +1,71 @@
 package org.apache.helix.gateway.util;
 
-import org.apache.helix.gateway.service.GatewayServiceManager;
-import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState;
 import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;
+import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardTransitionStatus;
+import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
 
 
 public final class StateTransitionMessageTranslateUtil {
 
-  public static TransitionMessage translateSTMsgToProto() {
+  public static TransitionMessage translateSTMsgToTransitionMessage() {
 return null;
   }
 
-  public static GatewayServiceManager.GateWayServiceEvent 
translateProtoToSTMsg(ShardStateMessage message) {
-return null;
+  /**
+   * Translate from GRPC function call to Helix Gateway Service event.
+   */
+  public static GatewayServiceEvent 
translateShardStateMessageToEvent(ShardStateMessage request) {
+
+if (request.hasShardState()) { // init connection to gateway service
+  ShardState shardState = request.getShardState();
+  String instanceName = shardState.getInstanceName();
+  String clusterName = shardState.getClusterName();
+  Map shardStateMap = new HashMap<>();
+  for (HelixGatewayServiceOuterClass.SingleResourceState resourceState : 
shardState.getResourceStateList()) {
+for (HelixGatewayServiceOuterClass.SingleShardState state : 
resourceState.getShardStatesList()) {
+  shardStateMap.put(resourceState.getResource() + "_" + 
state.getShardName(), state.getCurrentState());
+}
+  }
+  GatewayServiceEvent.GateWayServiceEventBuilder builder =
+  new 
GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.CONNECT).setClusterName(
+  clusterName).setParticipantName(instanceName);
+  return builder.build();
+} else {

Review Comment:
   if there is a return, no else clause needed.



##
helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java:
##
@@ -1,17 +1,71 @@
 package org.apache.helix.gateway.util;
 
-import org.apache.helix.gateway.service.GatewayServiceManager;
-import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState;
 import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;
+import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardTransitionStatus;
+import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
 
 
 public final class StateTransitionMessageTranslateUtil {
 
-  public static TransitionMessage translateSTMsgToProto() {
+  public static TransitionMessage translateSTMsgToTransitionMessage() {
 return null;
   }
 
-  public static GatewayServiceManager.GateWayServiceEvent 
translateProtoToSTMsg(ShardStateMessage message) {
-return null;
+  /**
+   * Translate from GRPC function call to Helix Gateway Service event.
+   */
+  public static GatewayServiceEvent 
translateShardStateMessageToEvent(ShardStateMessage request) {
+
+if (request.hasShardState()) { // init connection to gateway service
+  ShardState shardState = request.getShardState();
+  String instanceName = shardState.getInstanceName();
+  String clusterName = shardState.getClusterName();

Review Comment:
   No need to define the variable when it only uses once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@heli

Re: [PR] Implement GatewayServiceManager [helix]

2024-07-23 Thread via GitHub


junkaixue commented on code in PR #2844:
URL: https://github.com/apache/helix/pull/2844#discussion_r1688972784


##
helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java:
##
@@ -0,0 +1,122 @@
+package org.apache.helix.gateway.grpcservice;
+
+import io.grpc.stub.StreamObserver;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
+import proto.org.apache.helix.gateway.*;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*;
+
+import java.util.Map;
+
+
+/**
+ * Helix Gateway Service GRPC UI implementation.
+ */
+public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.HelixGatewayServiceImplBase
+implements HelixGatewayServiceProcessor {
+
+  // Map to store the observer for each instance
+  Map> _observerMap = new 
HashMap<>();
+  // A reverse map to store the instance name for each observer. It is used to 
find the instance when connection is closed.
+  Map, Pair> 
_reversedObserverMap = new HashMap<>();
+
+  ReadWriteLock _observerMapLock;
+  GatewayServiceManager _manager;
+
+  public HelixGatewayServiceGrpcService(GatewayServiceManager manager) {
+// register manager
+_manager = manager;
+_observerMapLock = new ReentrantReadWriteLock();
+  }
+
+  @Override
+  public 
StreamObserver
 report(
+  
StreamObserver
 responseObserver) {
+
+return new StreamObserver() {
+
+  @Override
+  public void onNext(ShardStateMessage request) {
+if (request.hasShardState()) {
+  ShardState shardState = request.getShardState();
+  // Process shardState
+  updateObserver(shardState.getInstanceName(), 
shardState.getClusterName(), responseObserver);
+}
+
_manager.newGatewayServiceEvent(StateTransitionMessageTranslateUtil.translateShardStateMessageToEvent(request));
+  }
+
+  @Override
+  public void onError(Throwable t) {
+onClientClose(responseObserver);
+  }
+
+  @Override
+  public void onCompleted() {
+// called when the client completes
+onClientClose(responseObserver);
+  }
+};
+  }
+
+  @Override
+  public boolean sendStateTransitionMessage(String instanceName) {
+StreamObserver observer;
+observer = _observerMap.get(instanceName);
+_observerMapLock.readLock().lock();

Review Comment:
   use fine-grained lock for each entry.



##
helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java:
##
@@ -0,0 +1,77 @@
+package org.apache.helix.gateway.service;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.constant.MessageStatus;
+
+
+public  class GatewayServiceEvent {

Review Comment:
   white space.



##
helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java:
##
@@ -16,50 +19,85 @@
  */
 
 public class GatewayServiceManager {
-
-  HelixGatewayServiceService _helixGatewayServiceService;
-
-  HelixGatewayServiceProcessor _helixGatewayServiceProcessor;
-
   Map _helixGatewayServiceMap;
 
-  // TODO: add thread pool for init
-  // single thread tp for update
+  // a single thread tp for event processing
+  ExecutorService _participantStateTransitionResultUpdator;
 
-  public enum EventType {
-CONNECT,// init connection to gateway service
-UPDATE,  // update state transition result
-DISCONNECT // shutdown connection to gateway service.
-  }
+  // a thread pool for participant connecting
+  ExecutorService _participantConnectingTp;
 
-  public class GateWayServiceEvent {
-// event type
-EventType eventType;
-// event data
-String clusterName;
-String participantName;
-
-// todo: add more fields
-  }
+  // grpc service
+  HelixGatewayServiceGrpcService _grpcService;
 
   public GatewayServiceManager() {
 _helixGatewayServiceMap = new ConcurrentHashMap<>();
+_participantStateTransitionResultUpdator = 
Executors.newSingleThreadExecutor();
+_participantConnectingTp = Executors.newFixedThreadPool(10); // todo: make 
it configurable

Review Comment:
   Define variable. use DEFAULT one now.



##
helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java:
##
@@ -0,0 +1,122 @@
+package org.apache.helix.gateway.grpcservice;
+
+import io.grpc.stub.StreamObserver;
+import java.util.HashMap;
+import 

Re: [PR] Implement GatewayServiceManager [helix]

2024-07-23 Thread via GitHub


xyuanlu commented on code in PR #2844:
URL: https://github.com/apache/helix/pull/2844#discussion_r1688780575


##
helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java:
##
@@ -26,40 +29,85 @@ public class GatewayServiceManager {
   // TODO: add thread pool for init
   // single thread tp for update
 
-  public enum EventType {
-CONNECT,// init connection to gateway service
-UPDATE,  // update state transition result
-DISCONNECT // shutdown connection to gateway service.
-  }
+  // Map of cluster to HelixGatewayService
 
-  public class GateWayServiceEvent {
-// event type
-EventType eventType;
-// event data
-String clusterName;
-String participantName;
+  // a single thread tp for event processing
+  ExecutorService _participantStateTransitionResultUpdator;
 
-// todo: add more fields
-  }
+  // a thread pool for participant connecting
+  ExecutorService _participantConnectingTp;
+
+  // grpc service
+  HelixGatewayServiceGrpcService _grpcService;
 
   public GatewayServiceManager() {
 _helixGatewayServiceMap = new ConcurrentHashMap<>();
+_participantStateTransitionResultUpdator = 
Executors.newSingleThreadExecutor();
+_participantConnectingTp = Executors.newFixedThreadPool(10); // todo: make 
it configurable
+_grpcService = new HelixGatewayServiceGrpcService(this);
   }
 
   public AtomicBoolean sendTransitionRequestToApplicationInstance() {
 
 return null;
   }
 
-  public void updateShardState() {
-
+  public void newGatewayServiceEvent(GatewayServiceEvent event) {
+if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) {
+  _participantStateTransitionResultUpdator.submit(new 
shardStateUpdator(event));
+} else {
+  _participantConnectingTp.submit(new 
participantConnectionProcessor(event));
+}
   }
 
-  public void newParticipantConnecting() {
-
+  /**
+   * Update shard state
+   */
+  class shardStateUpdator implements Runnable {
+
+GatewayServiceEvent _event;
+
+public shardStateUpdator(GatewayServiceEvent event) {
+  _event = event;
+}
+
+@Override
+public void run() {
+  HelixGatewayService helixGatewayService = 
_helixGatewayServiceMap.get(_event._clusterName);
+  if (helixGatewayService == null) {
+// TODO: return error code and throw exception.
+return;
+  }
+  helixGatewayService.receiveSTResponse();
+}
   }
 
-  public void participantDisconnected() {
-
+  /**
+   * Create HelixGatewayService instance and register it to the manager.
+   * It includes waiting for ZK connection, and also wait for previous 
Liveinstance to expire.
+   */
+  class participantConnectionProcessor implements Runnable {
+GatewayServiceEvent _event;
+
+public participantConnectionProcessor(GatewayServiceEvent event) {
+  _event = event;
+}
+
+@Override
+public void run() {
+  HelixGatewayService helixGatewayService;
+  if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) {

Review Comment:
   This is only for init connection and disconnect. update if handled in 
shardStateUpdator



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement GatewayServiceManager [helix]

2024-07-23 Thread via GitHub


junkaixue commented on code in PR #2844:
URL: https://github.com/apache/helix/pull/2844#discussion_r1688759371


##
helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java:
##
@@ -26,40 +29,85 @@ public class GatewayServiceManager {
   // TODO: add thread pool for init
   // single thread tp for update
 
-  public enum EventType {
-CONNECT,// init connection to gateway service
-UPDATE,  // update state transition result
-DISCONNECT // shutdown connection to gateway service.
-  }
+  // Map of cluster to HelixGatewayService
 
-  public class GateWayServiceEvent {
-// event type
-EventType eventType;
-// event data
-String clusterName;
-String participantName;
+  // a single thread tp for event processing
+  ExecutorService _participantStateTransitionResultUpdator;
 
-// todo: add more fields
-  }
+  // a thread pool for participant connecting
+  ExecutorService _participantConnectingTp;
+
+  // grpc service
+  HelixGatewayServiceGrpcService _grpcService;
 
   public GatewayServiceManager() {
 _helixGatewayServiceMap = new ConcurrentHashMap<>();
+_participantStateTransitionResultUpdator = 
Executors.newSingleThreadExecutor();
+_participantConnectingTp = Executors.newFixedThreadPool(10); // todo: make 
it configurable
+_grpcService = new HelixGatewayServiceGrpcService(this);
   }
 
   public AtomicBoolean sendTransitionRequestToApplicationInstance() {
 
 return null;
   }
 
-  public void updateShardState() {
-
+  public void newGatewayServiceEvent(GatewayServiceEvent event) {
+if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) {
+  _participantStateTransitionResultUpdator.submit(new 
shardStateUpdator(event));
+} else {
+  _participantConnectingTp.submit(new 
participantConnectionProcessor(event));
+}
   }
 
-  public void newParticipantConnecting() {
-
+  /**
+   * Update shard state
+   */
+  class shardStateUpdator implements Runnable {
+
+GatewayServiceEvent _event;
+
+public shardStateUpdator(GatewayServiceEvent event) {
+  _event = event;
+}
+
+@Override
+public void run() {
+  HelixGatewayService helixGatewayService = 
_helixGatewayServiceMap.get(_event._clusterName);
+  if (helixGatewayService == null) {
+// TODO: return error code and throw exception.
+return;
+  }
+  helixGatewayService.receiveSTResponse();
+}
   }
 
-  public void participantDisconnected() {
-
+  /**
+   * Create HelixGatewayService instance and register it to the manager.
+   * It includes waiting for ZK connection, and also wait for previous 
Liveinstance to expire.
+   */
+  class participantConnectionProcessor implements Runnable {
+GatewayServiceEvent _event;
+
+public participantConnectionProcessor(GatewayServiceEvent event) {
+  _event = event;
+}
+
+@Override
+public void run() {
+  HelixGatewayService helixGatewayService;
+  if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) {

Review Comment:
   No... UPDATE still falls back to DISCONNECT path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement GatewayServiceManager [helix]

2024-07-23 Thread via GitHub


xyuanlu commented on code in PR #2844:
URL: https://github.com/apache/helix/pull/2844#discussion_r1688449541


##
helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java:
##
@@ -26,40 +29,85 @@ public class GatewayServiceManager {
   // TODO: add thread pool for init
   // single thread tp for update
 
-  public enum EventType {
-CONNECT,// init connection to gateway service
-UPDATE,  // update state transition result
-DISCONNECT // shutdown connection to gateway service.
-  }
+  // Map of cluster to HelixGatewayService
 
-  public class GateWayServiceEvent {
-// event type
-EventType eventType;
-// event data
-String clusterName;
-String participantName;
+  // a single thread tp for event processing
+  ExecutorService _participantStateTransitionResultUpdator;
 
-// todo: add more fields
-  }
+  // a thread pool for participant connecting
+  ExecutorService _participantConnectingTp;
+
+  // grpc service
+  HelixGatewayServiceGrpcService _grpcService;
 
   public GatewayServiceManager() {
 _helixGatewayServiceMap = new ConcurrentHashMap<>();
+_participantStateTransitionResultUpdator = 
Executors.newSingleThreadExecutor();
+_participantConnectingTp = Executors.newFixedThreadPool(10); // todo: make 
it configurable
+_grpcService = new HelixGatewayServiceGrpcService(this);
   }
 
   public AtomicBoolean sendTransitionRequestToApplicationInstance() {
 
 return null;
   }
 
-  public void updateShardState() {
-
+  public void newGatewayServiceEvent(GatewayServiceEvent event) {
+if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) {
+  _participantStateTransitionResultUpdator.submit(new 
shardStateUpdator(event));
+} else {
+  _participantConnectingTp.submit(new 
participantConnectionProcessor(event));
+}
   }
 
-  public void newParticipantConnecting() {
-
+  /**
+   * Update shard state
+   */
+  class shardStateUpdator implements Runnable {
+
+GatewayServiceEvent _event;
+
+public shardStateUpdator(GatewayServiceEvent event) {
+  _event = event;
+}
+
+@Override
+public void run() {
+  HelixGatewayService helixGatewayService = 
_helixGatewayServiceMap.get(_event._clusterName);
+  if (helixGatewayService == null) {
+// TODO: return error code and throw exception.
+return;
+  }
+  helixGatewayService.receiveSTResponse();
+}
   }
 
-  public void participantDisconnected() {
-
+  /**
+   * Create HelixGatewayService instance and register it to the manager.
+   * It includes waiting for ZK connection, and also wait for previous 
Liveinstance to expire.
+   */
+  class participantConnectionProcessor implements Runnable {
+GatewayServiceEvent _event;
+
+public participantConnectionProcessor(GatewayServiceEvent event) {
+  _event = event;
+}
+
+@Override
+public void run() {
+  HelixGatewayService helixGatewayService;
+  if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) {

Review Comment:
   Updated. Commits are messed up when I split pr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement GatewayServiceManager [helix]

2024-07-23 Thread via GitHub


xyuanlu commented on code in PR #2844:
URL: https://github.com/apache/helix/pull/2844#discussion_r1688448617


##
helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java:
##
@@ -0,0 +1,7 @@
+package org.apache.helix.gateway.constant;
+
+public enum GatewayServiceEventType {

Review Comment:
   TFTR. I moved the enum to constant folder



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement GatewayServiceManager [helix]

2024-07-23 Thread via GitHub


junkaixue commented on code in PR #2844:
URL: https://github.com/apache/helix/pull/2844#discussion_r1688396795


##
helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java:
##
@@ -11,7 +15,24 @@ public static TransitionMessage translateSTMsgToProto() {
 return null;
   }
 
-  public static GatewayServiceManager.GateWayServiceEvent 
translateProtoToSTMsg(ShardStateMessage message) {
-return null;
+  public static GatewayServiceEvent translateProtoToEvent(ShardStateMessage 
request) {
+String instanceName = request.getInstanceName();
+String clusterName = request.getClusterName();
+
+Map shardStateMap = new HashMap<>();
+for (HelixGatewayServiceOuterClass.SingleShardState shardTransitionStatus 
: request.getShardStateList()) {
+  shardStateMap.put(shardTransitionStatus.getShardName(), 
shardTransitionStatus.getCurrentState());
+}
+GatewayServiceEvent.GateWayServiceEventBuilder builder =
+new 
GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.CONNECT).setClusterName(clusterName);

Review Comment:
   Why this is connect not update?



##
helix-gateway/src/main/java/org/apache/helix/gateway/constant/ShardState.java:
##
@@ -0,0 +1,5 @@
+package org.apache.helix.gateway.constant;
+
+public enum ShardState {

Review Comment:
   Why not use state model one? How about error?



##
helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java:
##
@@ -11,7 +15,24 @@ public static TransitionMessage translateSTMsgToProto() {
 return null;
   }
 
-  public static GatewayServiceManager.GateWayServiceEvent 
translateProtoToSTMsg(ShardStateMessage message) {
-return null;
+  public static GatewayServiceEvent translateProtoToEvent(ShardStateMessage 
request) {

Review Comment:
   I don't get this util at all...
   
   If you would like to differentiate the event type, you can just make a full 
util and selectively pass in null value for the arguments are not necessary.



##
helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java:
##
@@ -0,0 +1,95 @@
+package org.apache.helix.gateway.service;
+
+import java.util.Map;
+import org.apache.helix.gateway.constant.GatewayServiceEventType;
+
+
+public  class GatewayServiceEvent {
+  // event type
+  GatewayServiceEventType _eventType;
+  // event data
+  String _clusterName;
+  String _instanceName;
+  Map> _shardStateMap;
+
+  StateTransitionResult _StateTransitionResult;
+
+  public class StateTransitionResult {
+String stateTransitionId;
+String stateTransitionStatus;
+String shardState;
+
+public StateTransitionResult(String stateTransitionId, String 
stateTransitionStatus, String shardState) {
+  this.stateTransitionId = stateTransitionId;
+  this.stateTransitionStatus = stateTransitionStatus;
+  this.shardState = shardState;
+}
+  }
+
+  private GatewayServiceEvent(GatewayServiceEventType eventType, String 
clusterName, String instanceName) {
+this._eventType = eventType;
+this._clusterName = clusterName;
+this._instanceName = instanceName;
+  }
+
+  private GatewayServiceEvent(GatewayServiceEventType eventType, String 
clusterName, String instanceName,  Map> 
shardStateMap) {
+this._eventType = eventType;
+this._clusterName = clusterName;
+this._instanceName = instanceName;
+this._shardStateMap = shardStateMap;
+  }
+
+  private GatewayServiceEvent(GatewayServiceEventType eventType, String 
clusterName, String instanceName, String stateTransitionId,
+  String stateTransitionStatus, String shardState) {
+this._eventType = eventType;
+this._clusterName = clusterName;
+this._instanceName = instanceName;
+this._StateTransitionResult = new StateTransitionResult(stateTransitionId, 
stateTransitionStatus, shardState);
+  }
+

Review Comment:
   If we have a builder, no need to have so many constructor.
   
   Also even as public constructor, we should call "this(xxx)" to reduce 
redundant code.



##
helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java:
##
@@ -26,40 +29,85 @@ public class GatewayServiceManager {
   // TODO: add thread pool for init
   // single thread tp for update
 
-  public enum EventType {
-CONNECT,// init connection to gateway service
-UPDATE,  // update state transition result
-DISCONNECT // shutdown connection to gateway service.
-  }
+  // Map of cluster to HelixGatewayService
 
-  public class GateWayServiceEvent {
-// event type
-EventType eventType;
-// event data
-String clusterName;
-String participantName;
+  // a single thread tp for event processing
+  ExecutorService _participantStateTransitionResultUpdator;
 
-// todo: add more fields
-  }
+  // a thread pool for participant connecting
+  ExecutorService _participantConnectingTp;
+

Re: [PR] Implement GatewayServiceManager [helix]

2024-07-23 Thread via GitHub


frankmu commented on code in PR #2844:
URL: https://github.com/apache/helix/pull/2844#discussion_r1688378477


##
helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java:
##
@@ -0,0 +1,92 @@
+package org.apache.helix.gateway.grpcservice;
+
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
+import proto.org.apache.helix.gateway.*;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*;
+
+import java.util.Map;
+
+
+/**
+ * Helix Gateway Service GRPC UI implementation.
+ */
+public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.HelixGatewayServiceImplBase
+implements HelixGatewayServiceProcessor {
+
+  Map> _observerMap =
+  new ConcurrentHashMap>();
+  Map, String> _reversedObserverMap =
+  new ConcurrentHashMap, String>();
+
+  Map instanceToClusterMap = new ConcurrentHashMap();
+  GatewayServiceManager _manager;
+
+  public HelixGatewayServiceGrpcService(GatewayServiceManager manager) {
+// register manager
+_manager = manager;
+  }
+
+  @Override
+  public 
StreamObserver
 report(
+  
StreamObserver
 responseObserver) {
+
+return new StreamObserver() {
+
+  @Override
+  public void onNext(ShardStateMessage request) {
+// called when a client sends a message
+//
+String instanceName = request.getInstanceName();
+String clusterName = request.getClusterName();
+GatewayServiceEvent event;
+instanceToClusterMap.put(instanceName, clusterName);
+if (!_observerMap.containsValue(instanceName)) {
+  // new connection
+  updateObserver(instanceName, responseObserver);
+}

Review Comment:
   Not sure if we want to make this class thread safe. If yes, this might cause 
race condition: 
https://stackoverflow.com/questions/15850803/concurrenthashmap-putifabsent-method-functionality



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



Re: [PR] Implement GatewayServiceManager [helix]

2024-07-23 Thread via GitHub


xyuanlu commented on code in PR #2844:
URL: https://github.com/apache/helix/pull/2844#discussion_r1688381674


##
helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java:
##
@@ -0,0 +1,92 @@
+package org.apache.helix.gateway.grpcservice;
+
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
+import proto.org.apache.helix.gateway.*;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*;
+
+import java.util.Map;
+
+
+/**
+ * Helix Gateway Service GRPC UI implementation.
+ */
+public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.HelixGatewayServiceImplBase
+implements HelixGatewayServiceProcessor {
+
+  Map> _observerMap =
+  new ConcurrentHashMap>();
+  Map, String> _reversedObserverMap =
+  new ConcurrentHashMap, String>();
+
+  Map instanceToClusterMap = new ConcurrentHashMap();
+  GatewayServiceManager _manager;
+
+  public HelixGatewayServiceGrpcService(GatewayServiceManager manager) {
+// register manager
+_manager = manager;
+  }
+
+  @Override
+  public 
StreamObserver
 report(
+  
StreamObserver
 responseObserver) {
+
+return new StreamObserver() {
+
+  @Override
+  public void onNext(ShardStateMessage request) {
+// called when a client sends a message
+//
+String instanceName = request.getInstanceName();
+String clusterName = request.getClusterName();
+GatewayServiceEvent event;
+instanceToClusterMap.put(instanceName, clusterName);
+if (!_observerMap.containsValue(instanceName)) {
+  // new connection
+  updateObserver(instanceName, responseObserver);
+}

Review Comment:
   Good point. will update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org