kaisun2000 commented on a change in pull request #789: Add FederatedZkClient
URL: https://github.com/apache/helix/pull/789#discussion_r384152586
 
 

 ##########
 File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
 ##########
 @@ -20,344 +20,487 @@
  */
 
 import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.ZkConnection;
 import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
-import org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
+import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.Op;
 import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implements and supports all ZK operations defined in interface {@link 
RealmAwareZkClient},
+ * except for session-aware operations such as creating ephemeral nodes, for 
which
+ * an {@link UnsupportedOperationException} will be thrown.
+ * <p>
+ * It acts as a single ZK client but will automatically route 
read/write/change subscription
+ * requests to the corresponding ZkClient with the help of metadata store 
directory service.
+ * It could connect to multiple ZK addresses and maintain a {@link ZkClient} 
for each ZK address.
+ */
+public class FederatedZkClient implements RealmAwareZkClient {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FederatedZkClient.class);
 
+  private static final String FEDERATED_ZK_CLIENT = 
FederatedZkClient.class.getSimpleName();
+  private static final String DEDICATED_ZK_CLIENT_FACTORY =
+      DedicatedZkClientFactory.class.getSimpleName();
+
+  private final MetadataStoreRoutingData _metadataStoreRoutingData;
+  private final RealmAwareZkClient.RealmAwareZkClientConfig _clientConfig;
+
+  // ZK realm -> ZkClient
+  private final Map<String, ZkClient> _zkRealmToZkClientMap;
+
+  private volatile boolean _isClosed;
+  private PathBasedZkSerializer _pathBasedZkSerializer;
+
+  // TODO: support capacity of ZkClient number in one FederatedZkClient and do 
garbage collection.
+  public FederatedZkClient(RealmAwareZkClient.RealmAwareZkClientConfig 
clientConfig,
+      MetadataStoreRoutingData metadataStoreRoutingData) {
+    if (metadataStoreRoutingData == null) {
+      throw new IllegalArgumentException("MetadataStoreRoutingData cannot be 
null!");
+    }
+    if (clientConfig == null) {
+      throw new IllegalArgumentException("Client config cannot be null!");
+    }
+
+    _isClosed = false;
+    _clientConfig = clientConfig;
+    _pathBasedZkSerializer = clientConfig.getZkSerializer();
+    _metadataStoreRoutingData = metadataStoreRoutingData;
+    _zkRealmToZkClientMap = new ConcurrentHashMap<>();
+  }
 
-public class FederatedZkClient implements RealmAwareZkClient {
   @Override
   public List<String> subscribeChildChanges(String path, IZkChildListener 
listener) {
-    return null;
+    return getZkClient(path).subscribeChildChanges(path, listener);
   }
 
   @Override
   public void unsubscribeChildChanges(String path, IZkChildListener listener) {
-
+    getZkClient(path).unsubscribeChildChanges(path, listener);
   }
 
   @Override
   public void subscribeDataChanges(String path, IZkDataListener listener) {
-
+    getZkClient(path).subscribeDataChanges(path, listener);
   }
 
   @Override
   public void unsubscribeDataChanges(String path, IZkDataListener listener) {
-
+    getZkClient(path).unsubscribeDataChanges(path, listener);
   }
 
   @Override
   public void subscribeStateChanges(IZkStateListener listener) {
-
+    throwUnsupportedOperationException();
 
 Review comment:
   Add a message here in the exception. Something like " subscribeStateChanges 
not supported in FederatedZkClient"

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to