junkaixue commented on code in PR #2844:
URL: https://github.com/apache/helix/pull/2844#discussion_r1688972784


##########
helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java:
##########
@@ -0,0 +1,122 @@
+package org.apache.helix.gateway.grpcservice;
+
+import io.grpc.stub.StreamObserver;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
+import proto.org.apache.helix.gateway.*;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*;
+
+import java.util.Map;
+
+
+/**
+ * Helix Gateway Service GRPC UI implementation.
+ */
+public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.HelixGatewayServiceImplBase
+    implements HelixGatewayServiceProcessor {
+
+  // Map to store the observer for each instance
+  Map<String, StreamObserver<TransitionMessage>> _observerMap = new 
HashMap<>();
+  // A reverse map to store the instance name for each observer. It is used to 
find the instance when connection is closed.
+  Map<StreamObserver<TransitionMessage>, Pair<String, String>> 
_reversedObserverMap = new HashMap<>();
+
+  ReadWriteLock _observerMapLock;
+  GatewayServiceManager _manager;
+
+  public HelixGatewayServiceGrpcService(GatewayServiceManager manager) {
+    // register manager
+    _manager = manager;
+    _observerMapLock = new ReentrantReadWriteLock();
+  }
+
+  @Override
+  public 
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage>
 report(
+      
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage>
 responseObserver) {
+
+    return new StreamObserver<ShardStateMessage>() {
+
+      @Override
+      public void onNext(ShardStateMessage request) {
+        if (request.hasShardState()) {
+          ShardState shardState = request.getShardState();
+          // Process shardState
+          updateObserver(shardState.getInstanceName(), 
shardState.getClusterName(), responseObserver);
+        }
+        
_manager.newGatewayServiceEvent(StateTransitionMessageTranslateUtil.translateShardStateMessageToEvent(request));
+      }
+
+      @Override
+      public void onError(Throwable t) {
+        onClientClose(responseObserver);
+      }
+
+      @Override
+      public void onCompleted() {
+        // called when the client completes
+        onClientClose(responseObserver);
+      }
+    };
+  }
+
+  @Override
+  public boolean sendStateTransitionMessage(String instanceName) {
+    StreamObserver<TransitionMessage> observer;
+    observer = _observerMap.get(instanceName);
+    _observerMapLock.readLock().lock();

Review Comment:
   use fine-grained lock for each entry.



##########
helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java:
##########
@@ -0,0 +1,77 @@
+package org.apache.helix.gateway.service;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.constant.MessageStatus;
+
+
+public  class GatewayServiceEvent {

Review Comment:
   white space.



##########
helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java:
##########
@@ -16,50 +19,85 @@
  */
 
 public class GatewayServiceManager {
-
-  HelixGatewayServiceService _helixGatewayServiceService;
-
-  HelixGatewayServiceProcessor _helixGatewayServiceProcessor;
-
   Map<String, HelixGatewayService> _helixGatewayServiceMap;
 
-  // TODO: add thread pool for init
-  // single thread tp for update
+  // a single thread tp for event processing
+  ExecutorService _participantStateTransitionResultUpdator;
 
-  public enum EventType {
-    CONNECT,    // init connection to gateway service
-    UPDATE,  // update state transition result
-    DISCONNECT // shutdown connection to gateway service.
-  }
+  // a thread pool for participant connecting
+  ExecutorService _participantConnectingTp;
 
-  public class GateWayServiceEvent {
-    // event type
-    EventType eventType;
-    // event data
-    String clusterName;
-    String participantName;
-
-    // todo: add more fields
-  }
+  // grpc service
+  HelixGatewayServiceGrpcService _grpcService;
 
   public GatewayServiceManager() {
     _helixGatewayServiceMap = new ConcurrentHashMap<>();
+    _participantStateTransitionResultUpdator = 
Executors.newSingleThreadExecutor();
+    _participantConnectingTp = Executors.newFixedThreadPool(10); // todo: make 
it configurable

Review Comment:
   Define variable. use DEFAULT one now.



##########
helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java:
##########
@@ -0,0 +1,122 @@
+package org.apache.helix.gateway.grpcservice;
+
+import io.grpc.stub.StreamObserver;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
+import proto.org.apache.helix.gateway.*;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*;
+
+import java.util.Map;
+
+
+/**
+ * Helix Gateway Service GRPC UI implementation.
+ */
+public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.HelixGatewayServiceImplBase
+    implements HelixGatewayServiceProcessor {
+
+  // Map to store the observer for each instance
+  Map<String, StreamObserver<TransitionMessage>> _observerMap = new 
HashMap<>();
+  // A reverse map to store the instance name for each observer. It is used to 
find the instance when connection is closed.
+  Map<StreamObserver<TransitionMessage>, Pair<String, String>> 
_reversedObserverMap = new HashMap<>();
+
+  ReadWriteLock _observerMapLock;
+  GatewayServiceManager _manager;
+
+  public HelixGatewayServiceGrpcService(GatewayServiceManager manager) {
+    // register manager
+    _manager = manager;
+    _observerMapLock = new ReentrantReadWriteLock();
+  }
+
+  @Override
+  public 
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage>
 report(
+      
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage>
 responseObserver) {
+
+    return new StreamObserver<ShardStateMessage>() {
+
+      @Override
+      public void onNext(ShardStateMessage request) {
+        if (request.hasShardState()) {
+          ShardState shardState = request.getShardState();
+          // Process shardState
+          updateObserver(shardState.getInstanceName(), 
shardState.getClusterName(), responseObserver);
+        }
+        
_manager.newGatewayServiceEvent(StateTransitionMessageTranslateUtil.translateShardStateMessageToEvent(request));
+      }
+
+      @Override
+      public void onError(Throwable t) {
+        onClientClose(responseObserver);
+      }
+
+      @Override
+      public void onCompleted() {
+        // called when the client completes
+        onClientClose(responseObserver);
+      }
+    };
+  }
+
+  @Override
+  public boolean sendStateTransitionMessage(String instanceName) {
+    StreamObserver<TransitionMessage> observer;
+    observer = _observerMap.get(instanceName);
+    _observerMapLock.readLock().lock();

Review Comment:
   also no need for read-write lock. read-write lock to guarantee data read 
after write, not apply for this case.



##########
helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java:
##########
@@ -0,0 +1,77 @@
+package org.apache.helix.gateway.service;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.constant.MessageStatus;
+
+
+public  class GatewayServiceEvent {
+  // event type
+  GatewayServiceEventType _eventType;
+  // event data
+  String _clusterName;
+  String _instanceName;
+  Map<String, Map<String, String>> _shardStateMap;

Review Comment:
   Make it private.



##########
helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java:
##########
@@ -16,50 +19,85 @@
  */
 
 public class GatewayServiceManager {
-
-  HelixGatewayServiceService _helixGatewayServiceService;
-
-  HelixGatewayServiceProcessor _helixGatewayServiceProcessor;
-
   Map<String, HelixGatewayService> _helixGatewayServiceMap;
 
-  // TODO: add thread pool for init
-  // single thread tp for update
+  // a single thread tp for event processing
+  ExecutorService _participantStateTransitionResultUpdator;
 
-  public enum EventType {
-    CONNECT,    // init connection to gateway service
-    UPDATE,  // update state transition result
-    DISCONNECT // shutdown connection to gateway service.
-  }
+  // a thread pool for participant connecting
+  ExecutorService _participantConnectingTp;
 
-  public class GateWayServiceEvent {
-    // event type
-    EventType eventType;
-    // event data
-    String clusterName;
-    String participantName;
-
-    // todo: add more fields
-  }
+  // grpc service
+  HelixGatewayServiceGrpcService _grpcService;
 
   public GatewayServiceManager() {
     _helixGatewayServiceMap = new ConcurrentHashMap<>();
+    _participantStateTransitionResultUpdator = 
Executors.newSingleThreadExecutor();
+    _participantConnectingTp = Executors.newFixedThreadPool(10); // todo: make 
it configurable
+    _grpcService = new HelixGatewayServiceGrpcService(this);
   }
 
   public AtomicBoolean sendTransitionRequestToApplicationInstance() {
 
     return null;
   }
 
-  public void updateShardState() {
-
+  public void newGatewayServiceEvent(GatewayServiceEvent event) {
+    if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) {
+      _participantStateTransitionResultUpdator.submit(new 
shardStateUpdator(event));
+    } else {
+      _participantConnectingTp.submit(new 
participantConnectionProcessor(event));
+    }
   }
 
-  public void newParticipantConnecting() {
-
+  /**
+   * Update shard state
+   */
+  class shardStateUpdator implements Runnable {
+
+    GatewayServiceEvent _event;
+
+    public shardStateUpdator(GatewayServiceEvent event) {
+      _event = event;
+    }
+
+    @Override
+    public void run() {
+      HelixGatewayService helixGatewayService = 
_helixGatewayServiceMap.get(_event._clusterName);
+      if (helixGatewayService == null) {
+        // TODO: return error code and throw exception.
+        return;
+      }
+      helixGatewayService.receiveSTResponse();
+    }
   }
 
-  public void participantDisconnected() {
-
+  /**
+   * Create HelixGatewayService instance and register it to the manager.
+   * It includes waiting for ZK connection, and also wait for previous 
Liveinstance to expire.
+   */
+  class participantConnectionProcessor implements Runnable {
+    GatewayServiceEvent _event;
+
+    public participantConnectionProcessor(GatewayServiceEvent event) {
+      _event = event;
+    }
+
+    @Override
+    public void run() {
+      HelixGatewayService helixGatewayService;
+      if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) {
+        if (_helixGatewayServiceMap.containsKey(_event._clusterName)) {
+          helixGatewayService = 
_helixGatewayServiceMap.get(_event._clusterName);
+        } else {
+          helixGatewayService = new 
HelixGatewayService(GatewayServiceManager.this, _event._clusterName);
+          _helixGatewayServiceMap.put(_event._clusterName, 
helixGatewayService);
+        }
+        helixGatewayService.registerParticipant();
+      } else { // disconnected
+        helixGatewayService = _helixGatewayServiceMap.get(_event._clusterName);

Review Comment:
   NPE?



##########
helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java:
##########
@@ -16,50 +19,85 @@
  */
 
 public class GatewayServiceManager {
-
-  HelixGatewayServiceService _helixGatewayServiceService;
-
-  HelixGatewayServiceProcessor _helixGatewayServiceProcessor;
-
   Map<String, HelixGatewayService> _helixGatewayServiceMap;
 
-  // TODO: add thread pool for init
-  // single thread tp for update
+  // a single thread tp for event processing
+  ExecutorService _participantStateTransitionResultUpdator;
 
-  public enum EventType {
-    CONNECT,    // init connection to gateway service
-    UPDATE,  // update state transition result
-    DISCONNECT // shutdown connection to gateway service.
-  }
+  // a thread pool for participant connecting
+  ExecutorService _participantConnectingTp;
 
-  public class GateWayServiceEvent {
-    // event type
-    EventType eventType;
-    // event data
-    String clusterName;
-    String participantName;
-
-    // todo: add more fields
-  }
+  // grpc service
+  HelixGatewayServiceGrpcService _grpcService;
 
   public GatewayServiceManager() {
     _helixGatewayServiceMap = new ConcurrentHashMap<>();
+    _participantStateTransitionResultUpdator = 
Executors.newSingleThreadExecutor();
+    _participantConnectingTp = Executors.newFixedThreadPool(10); // todo: make 
it configurable
+    _grpcService = new HelixGatewayServiceGrpcService(this);
   }
 
   public AtomicBoolean sendTransitionRequestToApplicationInstance() {
 
     return null;
   }
 
-  public void updateShardState() {
-
+  public void newGatewayServiceEvent(GatewayServiceEvent event) {
+    if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) {
+      _participantStateTransitionResultUpdator.submit(new 
shardStateUpdator(event));
+    } else {
+      _participantConnectingTp.submit(new 
participantConnectionProcessor(event));
+    }
   }
 
-  public void newParticipantConnecting() {
-
+  /**
+   * Update shard state
+   */
+  class shardStateUpdator implements Runnable {
+
+    GatewayServiceEvent _event;
+
+    public shardStateUpdator(GatewayServiceEvent event) {
+      _event = event;
+    }
+
+    @Override
+    public void run() {
+      HelixGatewayService helixGatewayService = 
_helixGatewayServiceMap.get(_event._clusterName);
+      if (helixGatewayService == null) {
+        // TODO: return error code and throw exception.
+        return;
+      }
+      helixGatewayService.receiveSTResponse();
+    }
   }
 
-  public void participantDisconnected() {
-
+  /**
+   * Create HelixGatewayService instance and register it to the manager.
+   * It includes waiting for ZK connection, and also wait for previous 
Liveinstance to expire.
+   */
+  class participantConnectionProcessor implements Runnable {
+    GatewayServiceEvent _event;
+
+    public participantConnectionProcessor(GatewayServiceEvent event) {
+      _event = event;
+    }
+
+    @Override
+    public void run() {
+      HelixGatewayService helixGatewayService;
+      if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) {
+        if (_helixGatewayServiceMap.containsKey(_event._clusterName)) {
+          helixGatewayService = 
_helixGatewayServiceMap.get(_event._clusterName);
+        } else {
+          helixGatewayService = new 
HelixGatewayService(GatewayServiceManager.this, _event._clusterName);
+          _helixGatewayServiceMap.put(_event._clusterName, 
helixGatewayService);

Review Comment:
   if (! contains) {
    new object
   }
   
   do xxxxxx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org

Reply via email to