frankmu commented on code in PR #2844: URL: https://github.com/apache/helix/pull/2844#discussion_r1688378477
########## helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java: ########## @@ -0,0 +1,92 @@ +package org.apache.helix.gateway.grpcservice; + +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.helix.gateway.constant.GatewayServiceEventType; +import org.apache.helix.gateway.service.GatewayServiceEvent; +import org.apache.helix.gateway.service.GatewayServiceManager; +import org.apache.helix.gateway.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil; +import proto.org.apache.helix.gateway.*; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*; + +import java.util.Map; + + +/** + * Helix Gateway Service GRPC UI implementation. + */ +public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase + implements HelixGatewayServiceProcessor { + + Map<String, StreamObserver<TransitionMessage>> _observerMap = + new ConcurrentHashMap<String, StreamObserver<TransitionMessage>>(); + Map<StreamObserver<TransitionMessage>, String> _reversedObserverMap = + new ConcurrentHashMap<StreamObserver<TransitionMessage>, String>(); + + Map<String, String> instanceToClusterMap = new ConcurrentHashMap<String, String>(); + GatewayServiceManager _manager; + + public HelixGatewayServiceGrpcService(GatewayServiceManager manager) { + // register manager + _manager = manager; + } + + @Override + public StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage> report( + StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage> responseObserver) { + + return new StreamObserver<ShardStateMessage>() { + + @Override + public void onNext(ShardStateMessage request) { + // called when a client sends a message + //.... + String instanceName = request.getInstanceName(); + String clusterName = request.getClusterName(); + GatewayServiceEvent event; + instanceToClusterMap.put(instanceName, clusterName); + if (!_observerMap.containsValue(instanceName)) { + // new connection + updateObserver(instanceName, responseObserver); + } Review Comment: Not sure if we want to make this class thread safe. If yes, this might cause race condition: https://stackoverflow.com/questions/15850803/concurrenthashmap-putifabsent-method-functionality -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org