frankmu commented on code in PR #2844:
URL: https://github.com/apache/helix/pull/2844#discussion_r1688378477


##########
helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java:
##########
@@ -0,0 +1,92 @@
+package org.apache.helix.gateway.grpcservice;
+
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
+import proto.org.apache.helix.gateway.*;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*;
+
+import java.util.Map;
+
+
+/**
+ * Helix Gateway Service GRPC UI implementation.
+ */
+public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.HelixGatewayServiceImplBase
+    implements HelixGatewayServiceProcessor {
+
+  Map<String, StreamObserver<TransitionMessage>> _observerMap =
+      new ConcurrentHashMap<String, StreamObserver<TransitionMessage>>();
+  Map<StreamObserver<TransitionMessage>, String> _reversedObserverMap =
+      new ConcurrentHashMap<StreamObserver<TransitionMessage>, String>();
+
+  Map<String, String> instanceToClusterMap = new ConcurrentHashMap<String, 
String>();
+  GatewayServiceManager _manager;
+
+  public HelixGatewayServiceGrpcService(GatewayServiceManager manager) {
+    // register manager
+    _manager = manager;
+  }
+
+  @Override
+  public 
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage>
 report(
+      
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage>
 responseObserver) {
+
+    return new StreamObserver<ShardStateMessage>() {
+
+      @Override
+      public void onNext(ShardStateMessage request) {
+        // called when a client sends a message
+        //....
+        String instanceName = request.getInstanceName();
+        String clusterName = request.getClusterName();
+        GatewayServiceEvent event;
+        instanceToClusterMap.put(instanceName, clusterName);
+        if (!_observerMap.containsValue(instanceName)) {
+          // new connection
+          updateObserver(instanceName, responseObserver);
+        }

Review Comment:
   Not sure if we want to make this class thread safe. If yes, this might cause 
race condition: 
https://stackoverflow.com/questions/15850803/concurrenthashmap-putifabsent-method-functionality



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org

Reply via email to