narendly commented on a change in pull request #789: Add FederatedZkClient
URL: https://github.com/apache/helix/pull/789#discussion_r385445396
 
 

 ##########
 File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
 ##########
 @@ -20,344 +20,508 @@
  */
 
 import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.ZkConnection;
 import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
-import org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
+import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.Op;
 import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implements and supports all ZK operations defined in interface {@link 
RealmAwareZkClient},
+ * except for session-aware operations such as creating ephemeral nodes, for 
which
+ * an {@link UnsupportedOperationException} will be thrown.
+ * <p>
+ * It acts as a single ZK client but will automatically route 
read/write/change subscription
+ * requests to the corresponding ZkClient with the help of metadata store 
directory service.
+ * It could connect to multiple ZK addresses and maintain a {@link ZkClient} 
for each ZK address.
+ * <p>
+ * Note: while the ordering of handling data/child changes listeners in one 
single ZK realm is
+ * guaranteed, listeners from different ZK realms are NOT guaranteed in order, 
which means listeners
+ * from different ZK realms could be handled concurrently. So the concurrency 
of listeners should be
+ * aware of when implementing listeners for different ZK realms.
+ */
+public class FederatedZkClient implements RealmAwareZkClient {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FederatedZkClient.class);
 
+  private static final String FEDERATED_ZK_CLIENT = 
FederatedZkClient.class.getSimpleName();
+  private static final String DEDICATED_ZK_CLIENT_FACTORY =
+      DedicatedZkClientFactory.class.getSimpleName();
+
+  private final MetadataStoreRoutingData _metadataStoreRoutingData;
+  private final RealmAwareZkClient.RealmAwareZkClientConfig _clientConfig;
+
+  // ZK realm -> ZkClient
+  private final Map<String, ZkClient> _zkRealmToZkClientMap;
+
+  private volatile boolean _isClosed;
+  private PathBasedZkSerializer _pathBasedZkSerializer;
+
+  // TODO: support capacity of ZkClient number in one FederatedZkClient and do 
garbage collection.
+  public FederatedZkClient(RealmAwareZkClient.RealmAwareZkClientConfig 
clientConfig,
+      MetadataStoreRoutingData metadataStoreRoutingData) {
+    if (metadataStoreRoutingData == null) {
+      throw new IllegalArgumentException("MetadataStoreRoutingData cannot be 
null!");
+    }
+    if (clientConfig == null) {
+      throw new IllegalArgumentException("Client config cannot be null!");
+    }
+
+    _isClosed = false;
+    _clientConfig = clientConfig;
+    _pathBasedZkSerializer = clientConfig.getZkSerializer();
+    _metadataStoreRoutingData = metadataStoreRoutingData;
+    _zkRealmToZkClientMap = new ConcurrentHashMap<>();
+  }
 
-public class FederatedZkClient implements RealmAwareZkClient {
   @Override
   public List<String> subscribeChildChanges(String path, IZkChildListener 
listener) {
-    return null;
+    return getZkClient(path).subscribeChildChanges(path, listener);
   }
 
   @Override
   public void unsubscribeChildChanges(String path, IZkChildListener listener) {
-
+    getZkClient(path).unsubscribeChildChanges(path, listener);
   }
 
   @Override
   public void subscribeDataChanges(String path, IZkDataListener listener) {
-
+    getZkClient(path).subscribeDataChanges(path, listener);
   }
 
   @Override
   public void unsubscribeDataChanges(String path, IZkDataListener listener) {
-
+    getZkClient(path).unsubscribeDataChanges(path, listener);
   }
 
   @Override
   public void subscribeStateChanges(IZkStateListener listener) {
-
+    throwUnsupportedOperationException();
   }
 
   @Override
   public void unsubscribeStateChanges(IZkStateListener listener) {
+    throwUnsupportedOperationException();
+  }
 
+  @Override
+  public void subscribeStateChanges(
+      org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener 
listener) {
+    throwUnsupportedOperationException();
   }
 
   @Override
-  public void unsubscribeAll() {
+  public void unsubscribeStateChanges(
+      org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener 
listener) {
+    throwUnsupportedOperationException();
+  }
 
+  @Override
+  public void unsubscribeAll() {
+    _zkRealmToZkClientMap.values().forEach(ZkClient::unsubscribeAll);
   }
 
   @Override
   public void createPersistent(String path) {
-
+    createPersistent(path, false);
   }
 
   @Override
   public void createPersistent(String path, boolean createParents) {
-
+    createPersistent(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE);
   }
 
   @Override
   public void createPersistent(String path, boolean createParents, List<ACL> 
acl) {
-
+    getZkClient(path).createPersistent(path, createParents, acl);
   }
 
   @Override
   public void createPersistent(String path, Object data) {
-
+    create(path, data, CreateMode.PERSISTENT);
   }
 
   @Override
   public void createPersistent(String path, Object data, List<ACL> acl) {
-
+    create(path, data, acl, CreateMode.PERSISTENT);
   }
 
   @Override
   public String createPersistentSequential(String path, Object data) {
-    return null;
+    return create(path, data, CreateMode.PERSISTENT_SEQUENTIAL);
   }
 
   @Override
   public String createPersistentSequential(String path, Object data, List<ACL> 
acl) {
-    return null;
+    return create(path, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);
   }
 
   @Override
   public void createEphemeral(String path) {
-
+    create(path, null, CreateMode.EPHEMERAL);
   }
 
   @Override
   public void createEphemeral(String path, String sessionId) {
-
+    createEphemeral(path, null, sessionId);
   }
 
   @Override
   public void createEphemeral(String path, List<ACL> acl) {
-
+    create(path, null, acl, CreateMode.EPHEMERAL);
   }
 
   @Override
   public void createEphemeral(String path, List<ACL> acl, String sessionId) {
-
+    create(path, null, acl, CreateMode.EPHEMERAL, sessionId);
   }
 
   @Override
   public String create(String path, Object data, CreateMode mode) {
-    return null;
+    return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
   }
 
   @Override
-  public String create(String path, Object datat, List<ACL> acl, CreateMode 
mode) {
-    return null;
+  public String create(String path, Object data, List<ACL> acl, CreateMode 
mode) {
+    return create(path, data, acl, mode, null);
   }
 
   @Override
   public void createEphemeral(String path, Object data) {
-
+    create(path, data, CreateMode.EPHEMERAL);
   }
 
   @Override
   public void createEphemeral(String path, Object data, String sessionId) {
-
+    create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, 
sessionId);
   }
 
   @Override
   public void createEphemeral(String path, Object data, List<ACL> acl) {
-
+    create(path, data, acl, CreateMode.EPHEMERAL);
   }
 
   @Override
   public void createEphemeral(String path, Object data, List<ACL> acl, String 
sessionId) {
-
+    create(path, data, acl, CreateMode.EPHEMERAL, sessionId);
   }
 
   @Override
   public String createEphemeralSequential(String path, Object data) {
-    return null;
+    return create(path, data, CreateMode.EPHEMERAL_SEQUENTIAL);
   }
 
   @Override
   public String createEphemeralSequential(String path, Object data, List<ACL> 
acl) {
-    return null;
+    return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL);
   }
 
   @Override
   public String createEphemeralSequential(String path, Object data, String 
sessionId) {
-    return null;
+    return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.EPHEMERAL_SEQUENTIAL,
+        sessionId);
   }
 
   @Override
   public String createEphemeralSequential(String path, Object data, List<ACL> 
acl,
       String sessionId) {
-    return null;
+    return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL, sessionId);
   }
 
   @Override
   public List<String> getChildren(String path) {
-    return null;
+    return getZkClient(path).getChildren(path);
   }
 
   @Override
   public int countChildren(String path) {
-    return 0;
+    return getZkClient(path).countChildren(path);
   }
 
   @Override
   public boolean exists(String path) {
-    return false;
+    return getZkClient(path).exists(path);
   }
 
   @Override
   public Stat getStat(String path) {
-    return null;
+    return getZkClient(path).getStat(path);
   }
 
   @Override
   public boolean waitUntilExists(String path, TimeUnit timeUnit, long time) {
-    return false;
+    return getZkClient(path).waitUntilExists(path, timeUnit, time);
   }
 
   @Override
   public void deleteRecursively(String path) {
-
+    getZkClient(path).deleteRecursively(path);
   }
 
   @Override
   public boolean delete(String path) {
-    return false;
+    return getZkClient(path).delete(path);
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public <T> T readData(String path) {
-    return null;
+    return (T) readData(path, false);
   }
 
   @Override
   public <T> T readData(String path, boolean returnNullIfPathNotExists) {
-    return null;
+    return getZkClient(path).readData(path, returnNullIfPathNotExists);
   }
 
   @Override
   public <T> T readData(String path, Stat stat) {
-    return null;
+    return getZkClient(path).readData(path, stat);
   }
 
   @Override
   public <T> T readData(String path, Stat stat, boolean watch) {
-    return null;
+    return getZkClient(path).readData(path, stat, watch);
   }
 
   @Override
   public <T> T readDataAndStat(String path, Stat stat, boolean 
returnNullIfPathNotExists) {
-    return null;
+    return getZkClient(path).readData(path, stat, returnNullIfPathNotExists);
   }
 
   @Override
   public void writeData(String path, Object object) {
-
+    writeData(path, object, -1);
   }
 
   @Override
   public <T> void updateDataSerialized(String path, DataUpdater<T> updater) {
-
+    getZkClient(path).updateDataSerialized(path, updater);
   }
 
   @Override
-  public void writeData(String path, Object datat, int expectedVersion) {
-
+  public void writeData(String path, Object data, int expectedVersion) {
+    writeDataReturnStat(path, data, expectedVersion);
   }
 
   @Override
-  public Stat writeDataReturnStat(String path, Object datat, int 
expectedVersion) {
-    return null;
+  public Stat writeDataReturnStat(String path, Object data, int 
expectedVersion) {
+    return getZkClient(path).writeDataReturnStat(path, data, expectedVersion);
   }
 
   @Override
-  public Stat writeDataGetStat(String path, Object datat, int expectedVersion) 
{
-    return null;
+  public Stat writeDataGetStat(String path, Object data, int expectedVersion) {
+    return writeDataReturnStat(path, data, expectedVersion);
   }
 
   @Override
-  public void asyncCreate(String path, Object datat, CreateMode mode,
+  public void asyncCreate(String path, Object data, CreateMode mode,
       ZkAsyncCallbacks.CreateCallbackHandler cb) {
-
+    getZkClient(path).asyncCreate(path, data, mode, cb);
   }
 
   @Override
-  public void asyncSetData(String path, Object datat, int version,
+  public void asyncSetData(String path, Object data, int version,
       ZkAsyncCallbacks.SetDataCallbackHandler cb) {
-
+    getZkClient(path).asyncSetData(path, data, version, cb);
   }
 
   @Override
   public void asyncGetData(String path, 
ZkAsyncCallbacks.GetDataCallbackHandler cb) {
-
+    getZkClient(path).asyncGetData(path, cb);
   }
 
   @Override
   public void asyncExists(String path, ZkAsyncCallbacks.ExistsCallbackHandler 
cb) {
-
+    getZkClient(path).asyncExists(path, cb);
   }
 
   @Override
   public void asyncDelete(String path, ZkAsyncCallbacks.DeleteCallbackHandler 
cb) {
-
+    getZkClient(path).asyncDelete(path, cb);
   }
 
   @Override
   public void watchForData(String path) {
-
+    getZkClient(path).watchForData(path);
   }
 
   @Override
   public List<String> watchForChilds(String path) {
-    return null;
+    return getZkClient(path).watchForChilds(path);
   }
 
   @Override
   public long getCreationTime(String path) {
-    return 0;
+    return getZkClient(path).getCreationTime(path);
   }
 
   @Override
   public List<OpResult> multi(Iterable<Op> ops) {
+    throwUnsupportedOperationException();
     return null;
   }
 
   @Override
   public boolean waitUntilConnected(long time, TimeUnit timeUnit) {
+    throwUnsupportedOperationException();
     return false;
   }
 
   @Override
   public String getServers() {
+    throwUnsupportedOperationException();
     return null;
   }
 
   @Override
   public long getSessionId() {
-    return 0;
+    // Session-aware is unsupported.
+    throwUnsupportedOperationException();
+    return 0L;
   }
 
   @Override
   public void close() {
+    if (isClosed()) {
+      return;
+    }
+
+    LOG.info("Closing {}.", FEDERATED_ZK_CLIENT);
 
+    synchronized (_zkRealmToZkClientMap) {
+      _zkRealmToZkClientMap.values().forEach(ZkClient::close);
+      _zkRealmToZkClientMap.clear();
+      _isClosed = true;
+    }
   }
 
   @Override
   public boolean isClosed() {
-    return false;
+    return _isClosed;
   }
 
   @Override
   public byte[] serialize(Object data, String path) {
-    return new byte[0];
+    return getZkClient(path).serialize(data, path);
   }
 
   @Override
   public <T> T deserialize(byte[] data, String path) {
-    return null;
+    return getZkClient(path).deserialize(data, path);
   }
 
   @Override
   public void setZkSerializer(ZkSerializer zkSerializer) {
-
+    _pathBasedZkSerializer = new BasicZkSerializer(zkSerializer);
+    _zkRealmToZkClientMap.values()
+        .forEach(zkClient -> zkClient.setZkSerializer(_pathBasedZkSerializer));
   }
 
   @Override
   public void setZkSerializer(PathBasedZkSerializer zkSerializer) {
-
+    _pathBasedZkSerializer = zkSerializer;
+    _zkRealmToZkClientMap.values().forEach(zkClient -> 
zkClient.setZkSerializer(zkSerializer));
   }
 
   @Override
   public PathBasedZkSerializer getZkSerializer() {
-    return null;
+    return _pathBasedZkSerializer;
+  }
+
+  private String create(final String path, final Object dataObject, final 
List<ACL> acl,
+      final CreateMode mode, final String expectedSessionId) {
+    if (mode.isEphemeral()) {
+      throwUnsupportedOperationException();
+    }
+
+    // Create mode is not session-aware, so the node does not have to be 
created
+    // by the expectedSessionId.
+    return getZkClient(path).create(path, dataObject, acl, mode);
+  }
+
+  private ZkClient getZkClient(String path) {
+    // If FederatedZkClient is closed, should not return ZkClient.
+    checkClosedState();
+
+    String zkRealm = getZkRealm(path);
+
+    // Use this zkClient reference to protect the returning zkClient from 
being null because of
+    // race condition.
+    ZkClient zkClient = _zkRealmToZkClientMap.get(zkRealm);
 
 Review comment:
   I don't agree that we should add the link to the stack overflow post, but 
feel free to explain here why you know this would work.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to