xyuanlu commented on code in PR #2844:
URL: https://github.com/apache/helix/pull/2844#discussion_r1688780575


##########
helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java:
##########
@@ -26,40 +29,85 @@ public class GatewayServiceManager {
   // TODO: add thread pool for init
   // single thread tp for update
 
-  public enum EventType {
-    CONNECT,    // init connection to gateway service
-    UPDATE,  // update state transition result
-    DISCONNECT // shutdown connection to gateway service.
-  }
+  // Map of cluster to HelixGatewayService
 
-  public class GateWayServiceEvent {
-    // event type
-    EventType eventType;
-    // event data
-    String clusterName;
-    String participantName;
+  // a single thread tp for event processing
+  ExecutorService _participantStateTransitionResultUpdator;
 
-    // todo: add more fields
-  }
+  // a thread pool for participant connecting
+  ExecutorService _participantConnectingTp;
+
+  // grpc service
+  HelixGatewayServiceGrpcService _grpcService;
 
   public GatewayServiceManager() {
     _helixGatewayServiceMap = new ConcurrentHashMap<>();
+    _participantStateTransitionResultUpdator = 
Executors.newSingleThreadExecutor();
+    _participantConnectingTp = Executors.newFixedThreadPool(10); // todo: make 
it configurable
+    _grpcService = new HelixGatewayServiceGrpcService(this);
   }
 
   public AtomicBoolean sendTransitionRequestToApplicationInstance() {
 
     return null;
   }
 
-  public void updateShardState() {
-
+  public void newGatewayServiceEvent(GatewayServiceEvent event) {
+    if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) {
+      _participantStateTransitionResultUpdator.submit(new 
shardStateUpdator(event));
+    } else {
+      _participantConnectingTp.submit(new 
participantConnectionProcessor(event));
+    }
   }
 
-  public void newParticipantConnecting() {
-
+  /**
+   * Update shard state
+   */
+  class shardStateUpdator implements Runnable {
+
+    GatewayServiceEvent _event;
+
+    public shardStateUpdator(GatewayServiceEvent event) {
+      _event = event;
+    }
+
+    @Override
+    public void run() {
+      HelixGatewayService helixGatewayService = 
_helixGatewayServiceMap.get(_event._clusterName);
+      if (helixGatewayService == null) {
+        // TODO: return error code and throw exception.
+        return;
+      }
+      helixGatewayService.receiveSTResponse();
+    }
   }
 
-  public void participantDisconnected() {
-
+  /**
+   * Create HelixGatewayService instance and register it to the manager.
+   * It includes waiting for ZK connection, and also wait for previous 
Liveinstance to expire.
+   */
+  class participantConnectionProcessor implements Runnable {
+    GatewayServiceEvent _event;
+
+    public participantConnectionProcessor(GatewayServiceEvent event) {
+      _event = event;
+    }
+
+    @Override
+    public void run() {
+      HelixGatewayService helixGatewayService;
+      if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) {

Review Comment:
   This is only for init connection and disconnect. update if handled in 
shardStateUpdator



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org

Reply via email to