kaisun2000 commented on a change in pull request #770: WIP: Add SharedZkClient 
and update SharedZkClientFactory
URL: https://github.com/apache/helix/pull/770#discussion_r380966871
 
 

 ##########
 File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
 ##########
 @@ -37,79 +50,541 @@
  * HelixZkClient that uses shared ZkConnection.
  * A SharedZkClient won't manipulate the shared ZkConnection directly.
  */
-public class SharedZkClient extends ZkClient implements HelixZkClient {
+public class SharedZkClient implements RealmAwareZkClient {
   private static Logger LOG = LoggerFactory.getLogger(SharedZkClient.class);
-  /*
-   * Since we cannot really disconnect the ZkConnection, we need a dummy 
ZkConnection placeholder.
-   * This is to ensure connection field is never null even the shared 
RealmAwareZkClient instance is closed so as to avoid NPE.
-   */
-  private final static ZkConnection IDLE_CONNECTION = new 
ZkConnection("Dummy_ZkServers");
-  private final OnCloseCallback _onCloseCallback;
-  private final ZkConnectionManager _connectionManager;
-
-  public interface OnCloseCallback {
-    /**
-     * Triggered after the RealmAwareZkClient is closed.
-     */
-    void onClose();
-  }
-
-  /**
-   * Construct a shared RealmAwareZkClient that uses a shared ZkConnection.
-   *
-   * @param connectionManager     The manager of the shared ZkConnection.
-   * @param clientConfig          ZkClientConfig details to create the shared 
RealmAwareZkClient.
-   * @param callback              Clean up logic when the shared 
RealmAwareZkClient is closed.
-   */
-  public SharedZkClient(ZkConnectionManager connectionManager, ZkClientConfig 
clientConfig,
-      OnCloseCallback callback) {
-    super(connectionManager.getConnection(), 0, 
clientConfig.getOperationRetryTimeout(),
-        clientConfig.getZkSerializer(), clientConfig.getMonitorType(), 
clientConfig.getMonitorKey(),
-        clientConfig.getMonitorInstanceName(), 
clientConfig.isMonitorRootPathOnly());
-    _connectionManager = connectionManager;
-    // Register to the base dedicated RealmAwareZkClient
-    _connectionManager.registerWatcher(this);
-    _onCloseCallback = callback;
+
+  private final HelixZkClient _innerSharedZkClient;
+  private final Map<String, String> _routingDataCache; // TODO: replace with 
RoutingDataCache
+  private final String _zkRealmShardingKey;
+
+  public SharedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig 
connectionConfig,
+      RealmAwareZkClient.RealmAwareZkClientConfig clientConfig,
+      Map<String, String> routingDataCache) {
+
+    if (connectionConfig == null) {
+      throw new IllegalArgumentException("RealmAwareZkConnectionConfig cannot 
be null!");
+    }
+    _zkRealmShardingKey = connectionConfig.getZkRealmShardingKey();
+
+    // TODO: Replace this Map with a real RoutingDataCache
+    if (routingDataCache == null) {
+      throw new IllegalArgumentException("RoutingDataCache cannot be null!");
+    }
+    _routingDataCache = routingDataCache;
+
+    // Get the ZkRealm address based on the ZK path sharding key
+    String zkRealmAddress = _routingDataCache.get(_zkRealmShardingKey);
+
+    // Create an InnerSharedZkClient to actually serve ZK requests
+    // TODO: Rename HelixZkClient in the future or remove it entirely - this 
will be a backward-compatibility breaking change because HelixZkClient is being 
used by Helix users.
+    HelixZkClient.ZkConnectionConfig zkConnectionConfig =
+        new HelixZkClient.ZkConnectionConfig(zkRealmAddress)
+            .setSessionTimeout(connectionConfig.getSessionTimeout());
+    _innerSharedZkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(zkConnectionConfig, (HelixZkClient.ZkClientConfig) 
clientConfig);
   }
 
   @Override
-  public void close() {
-    super.close();
-    if (isClosed()) {
-      // Note that if register is not done while constructing, these private 
fields may not be init yet.
-      if (_connectionManager != null) {
-        _connectionManager.unregisterWatcher(this);
+  public List<String> subscribeChildChanges(String path, IZkChildListener 
listener) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return _innerSharedZkClient.subscribeChildChanges(path, listener);
+  }
+
+  @Override
+  public void unsubscribeChildChanges(String path, IZkChildListener listener) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _innerSharedZkClient.unsubscribeChildChanges(path, listener);
+  }
+
+  @Override
+  public void subscribeDataChanges(String path, IZkDataListener listener) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _innerSharedZkClient.subscribeDataChanges(path, listener);
+  }
+
+  @Override
+  public void unsubscribeDataChanges(String path, IZkDataListener listener) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _innerSharedZkClient.unsubscribeDataChanges(path, listener);
+  }
+
+  @Override
+  public void subscribeStateChanges(IZkStateListener listener) {
+    _innerSharedZkClient.subscribeStateChanges(listener);
+  }
+
+  @Override
+  public void unsubscribeStateChanges(IZkStateListener listener) {
+    _innerSharedZkClient.unsubscribeStateChanges(listener);
+  }
+
+  @Override
+  public void unsubscribeAll() {
+    _innerSharedZkClient.unsubscribeAll();
+  }
+
+  @Override
+  public void createPersistent(String path) {
+    createPersistent(path, false);
+  }
+
+  @Override
+  public void createPersistent(String path, boolean createParents) {
+    createPersistent(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+  }
+
+  @Override
+  public void createPersistent(String path, boolean createParents, List<ACL> 
acl) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _innerSharedZkClient.createPersistent(path, createParents, acl);
+  }
+
+  @Override
+  public void createPersistent(String path, Object data) {
+    create(path, data, CreateMode.PERSISTENT);
+  }
+
+  @Override
+  public void createPersistent(String path, Object data, List<ACL> acl) {
+    create(path, data, acl, CreateMode.PERSISTENT);
+  }
+
+  @Override
+  public String createPersistentSequential(String path, Object data) {
+    return create(path, data, CreateMode.PERSISTENT_SEQUENTIAL);
+  }
+
+  @Override
+  public String createPersistentSequential(String path, Object data, List<ACL> 
acl) {
+    return create(path, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);
+  }
+
+  @Override
+  public void createEphemeral(String path) {
+    throw new UnsupportedOperationException(
+        "Create ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public void createEphemeral(String path, String sessionId) {
+    throw new UnsupportedOperationException(
+        "Create ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public void createEphemeral(String path, List<ACL> acl) {
+    throw new UnsupportedOperationException(
+        "Create ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public void createEphemeral(String path, List<ACL> acl, String sessionId) {
+    throw new UnsupportedOperationException(
+        "Create ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public String create(String path, Object data, CreateMode mode) {
+    return create(path, data, mode);
+  }
+
+  @Override
+  public String create(String path, Object datat, List<ACL> acl, CreateMode 
mode) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return _innerSharedZkClient.create(path, datat, acl, mode);
+  }
+
+  @Override
+  public void createEphemeral(String path, Object data) {
+    throw new UnsupportedOperationException(
+        "Create ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public void createEphemeral(String path, Object data, String sessionId) {
+    throw new UnsupportedOperationException(
+        "Create ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public void createEphemeral(String path, Object data, List<ACL> acl) {
+    throw new UnsupportedOperationException(
+        "Create ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public void createEphemeral(String path, Object data, List<ACL> acl, String 
sessionId) {
+    throw new UnsupportedOperationException(
+        "Create ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public String createEphemeralSequential(String path, Object data) {
+    throw new UnsupportedOperationException(
+        "Create ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public String createEphemeralSequential(String path, Object data, List<ACL> 
acl) {
+    throw new UnsupportedOperationException(
+        "Create ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public String createEphemeralSequential(String path, Object data, String 
sessionId) {
+    throw new UnsupportedOperationException(
+        "Create ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public String createEphemeralSequential(String path, Object data, List<ACL> 
acl,
+      String sessionId) {
+    throw new UnsupportedOperationException(
+        "Create ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public List<String> getChildren(String path) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return _innerSharedZkClient.getChildren(path);
+  }
+
+  @Override
+  public int countChildren(String path) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return countChildren(path);
+  }
+
+  @Override
+  public boolean exists(String path) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return _innerSharedZkClient.exists(path);
+  }
+
+  @Override
+  public Stat getStat(String path) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return _innerSharedZkClient.getStat(path);
+  }
+
+  @Override
+  public boolean waitUntilExists(String path, TimeUnit timeUnit, long time) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return _innerSharedZkClient.waitUntilExists(path, timeUnit, time);
+  }
+
+  @Override
+  public void deleteRecursively(String path) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _innerSharedZkClient.deleteRecursively(path);
+  }
+
+  @Override
+  public boolean delete(String path) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return _innerSharedZkClient.delete(path);
+  }
+
+  @Override
+  public <T> T readData(String path) {
+    return readData(path, false);
+  }
+
+  @Override
+  public <T> T readData(String path, boolean returnNullIfPathNotExists) {
+    T data = null;
+    try {
+      return readData(path, null);
+    } catch (ZkNoNodeException e) {
+      if (!returnNullIfPathNotExists) {
+        throw e;
       }
-      if (_onCloseCallback != null) {
-        _onCloseCallback.onClose();
+    }
+    return data;
+  }
+
+  @Override
+  public <T> T readData(String path, Stat stat) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return _innerSharedZkClient.readData(path, stat);
+  }
+
+  @Override
+  public <T> T readData(String path, Stat stat, boolean watch) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return _innerSharedZkClient.readData(path, stat, watch);
+  }
+
+  @Override
+  public <T> T readDataAndStat(String path, Stat stat, boolean 
returnNullIfPathNotExists) {
+    T data = null;
+    try {
+      data = readData(path, stat);
+    } catch (ZkNoNodeException e) {
+      if (!returnNullIfPathNotExists) {
+        throw e;
       }
     }
+    return data;
+  }
+
+  @Override
+  public void writeData(String path, Object object) {
+    writeData(path, object, -1);
   }
 
   @Override
-  public IZkConnection getConnection() {
-    if (isClosed()) {
-      return IDLE_CONNECTION;
+  public <T> void updateDataSerialized(String path, DataUpdater<T> updater) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
     }
-    return super.getConnection();
+    _innerSharedZkClient.updateDataSerialized(path, updater);
+  }
+
+  @Override
+  public void writeData(String path, Object datat, int expectedVersion) {
+    writeDataReturnStat(path, datat, expectedVersion);
   }
 
-  /**
-   * Since ZkConnection session is shared in this RealmAwareZkClient, do not 
create ephemeral node using a SharedZKClient.
-   */
   @Override
-  public String create(final String path, Object datat, final List<ACL> acl,
-      final CreateMode mode) {
-    if (mode.isEphemeral()) {
-      throw new UnsupportedOperationException(
-          "Create ephemeral nodes using a " + 
SharedZkClient.class.getSimpleName()
-              + " is not supported.");
+  public Stat writeDataReturnStat(String path, Object datat, int 
expectedVersion) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
     }
-    return super.create(path, datat, acl, mode);
+    return _innerSharedZkClient.writeDataReturnStat(path, datat, 
expectedVersion);
+  }
+
+  @Override
+  public Stat writeDataGetStat(String path, Object datat, int expectedVersion) 
{
+    return writeDataReturnStat(path, datat, expectedVersion);
   }
 
   @Override
-  protected boolean isManagingZkConnection() {
-    return false;
+  public void asyncCreate(String path, Object datat, CreateMode mode,
+      ZkAsyncCallbacks.CreateCallbackHandler cb) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _innerSharedZkClient.asyncCreate(path, datat, mode, cb);
+  }
+
+  @Override
+  public void asyncSetData(String path, Object datat, int version,
+      ZkAsyncCallbacks.SetDataCallbackHandler cb) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _innerSharedZkClient.asyncSetData(path, datat, version, cb);
+  }
+
+  @Override
+  public void asyncGetData(String path, 
ZkAsyncCallbacks.GetDataCallbackHandler cb) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _innerSharedZkClient.asyncGetData(path, cb);
+  }
+
+  @Override
+  public void asyncExists(String path, ZkAsyncCallbacks.ExistsCallbackHandler 
cb) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _innerSharedZkClient.asyncExists(path, cb);
+  }
+
+  @Override
+  public void asyncDelete(String path, ZkAsyncCallbacks.DeleteCallbackHandler 
cb) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _innerSharedZkClient.asyncDelete(path, cb);
+  }
+
+  @Override
+  public void watchForData(String path) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _innerSharedZkClient.watchForData(path);
+  }
+
+  @Override
+  public List<String> watchForChilds(String path) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return _innerSharedZkClient.watchForChilds(path);
+  }
+
+  @Override
+  public long getCreationTime(String path) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return _innerSharedZkClient.getCreationTime(path);
+  }
+
+  @Override
+  public List<OpResult> multi(Iterable<Op> ops) {
+    return _innerSharedZkClient.multi(ops);
+  }
+
+  @Override
+  public boolean waitUntilConnected(long time, TimeUnit timeUnit) {
+    return _innerSharedZkClient.waitUntilConnected(time, timeUnit);
+  }
+
+  @Override
+  public String getServers() {
+    return _innerSharedZkClient.getServers();
+  }
+
+  @Override
+  public long getSessionId() {
+    return _innerSharedZkClient.getSessionId();
+  }
+
+  @Override
+  public void close() {
+    _innerSharedZkClient.close();
 
 Review comment:
   This should be fine assuming assertion 1, innerSharedZKClient keeps the 
original behavior. Namely the last closed one will close the underlying 
ZkConnectionManager. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org

Reply via email to