pkuwm commented on a change in pull request #765: Add DedicatedZkClient and 
update DedicatedZkClientFactory
URL: https://github.com/apache/helix/pull/765#discussion_r380968738
 
 

 ##########
 File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
 ##########
 @@ -0,0 +1,592 @@
+package org.apache.helix.zookeeper.impl.client;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.impl.factory.MetadataStoreRoutingData;
+import org.apache.helix.zookeeper.zkclient.DataUpdater;
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+import org.apache.helix.zookeeper.zkclient.IZkConnection;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.ZkConnection;
+import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
+import org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener;
+import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
+import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * NOTE: DO NOT USE THIS CLASS DIRECTLY. Use DedicatedZkClientFactory to 
create instances of DedicatedZkClient.
+ *
+ * An implementation of the RealmAwareZkClient interface.
+ * Supports CRUD, data change subscription, and ephemeral mode operations.
+ */
+public class DedicatedZkClient implements RealmAwareZkClient {
+  private static Logger LOG = LoggerFactory.getLogger(DedicatedZkClient.class);
+
+  private final ZkClient _rawZkClient;
+  private final MetadataStoreRoutingData _metadataStoreRoutingData;
+  private final String _zkRealmShardingKey;
+  private final String _zkRealmAddress;
+
+  public DedicatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig 
connectionConfig,
+      RealmAwareZkClient.RealmAwareZkClientConfig clientConfig,
+      MetadataStoreRoutingData metadataStoreRoutingData) {
+
+    if (connectionConfig == null) {
+      throw new IllegalArgumentException("RealmAwareZkConnectionConfig cannot 
be null!");
+    }
+    _zkRealmShardingKey = connectionConfig.getZkRealmShardingKey();
+
+    if (metadataStoreRoutingData == null) {
+      throw new IllegalArgumentException("MetadataStoreRoutingData cannot be 
null!");
+    }
+    _metadataStoreRoutingData = metadataStoreRoutingData;
+
+    // Get the ZkRealm address based on the ZK path sharding key
+    String zkRealmAddress = 
_metadataStoreRoutingData.getMetadataStoreRealm(_zkRealmShardingKey);
+    if (zkRealmAddress == null || zkRealmAddress.isEmpty()) {
+      throw new IllegalArgumentException(
+          "ZK realm address for the given ZK realm sharding key is invalid! ZK 
realm address: "
+              + zkRealmAddress + " ZK realm sharding key: " + 
_zkRealmShardingKey);
+    }
+    _zkRealmAddress = zkRealmAddress;
+
+    // Create a ZK connection
+    IZkConnection zkConnection =
+        new ZkConnection(zkRealmAddress, connectionConfig.getSessionTimeout());
+
+    // Create a ZkClient
+    _rawZkClient = new ZkClient(zkConnection, (int) 
clientConfig.getConnectInitTimeout(),
+        clientConfig.getOperationRetryTimeout(), 
clientConfig.getZkSerializer(),
+        clientConfig.getMonitorType(), clientConfig.getMonitorKey(),
+        clientConfig.getMonitorInstanceName(), 
clientConfig.isMonitorRootPathOnly());
+  }
+
+  @Override
+  public List<String> subscribeChildChanges(String path, IZkChildListener 
listener) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return _rawZkClient.subscribeChildChanges(path, listener);
+  }
+
+  @Override
+  public void unsubscribeChildChanges(String path, IZkChildListener listener) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _rawZkClient.unsubscribeChildChanges(path, listener);
+  }
+
+  @Override
+  public void subscribeDataChanges(String path, IZkDataListener listener) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _rawZkClient.subscribeDataChanges(path, listener);
+  }
+
+  @Override
+  public void unsubscribeDataChanges(String path, IZkDataListener listener) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _rawZkClient.unsubscribeDataChanges(path, listener);
+  }
+
+  @Override
+  public void subscribeStateChanges(IZkStateListener listener) {
+    _rawZkClient.subscribeStateChanges(listener);
+  }
+
+  @Override
+  public void unsubscribeStateChanges(IZkStateListener listener) {
+    _rawZkClient.unsubscribeStateChanges(listener);
+  }
+
+  @Override
+  public void unsubscribeAll() {
+    _rawZkClient.unsubscribeAll();
+  }
+
+  @Override
+  public void createPersistent(String path) {
+    createPersistent(path, false);
+  }
+
+  @Override
+  public void createPersistent(String path, boolean createParents) {
+    createPersistent(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+  }
+
+  @Override
+  public void createPersistent(String path, boolean createParents, List<ACL> 
acl) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _rawZkClient.createPersistent(path, createParents, acl);
+  }
+
+  @Override
+  public void createPersistent(String path, Object data) {
+    create(path, data, CreateMode.PERSISTENT);
+  }
+
+  @Override
+  public void createPersistent(String path, Object data, List<ACL> acl) {
+    create(path, data, acl, CreateMode.PERSISTENT);
+  }
+
+  @Override
+  public String createPersistentSequential(String path, Object data) {
+    return create(path, data, CreateMode.PERSISTENT_SEQUENTIAL);
+  }
+
+  @Override
+  public String createPersistentSequential(String path, Object data, List<ACL> 
acl) {
+    return create(path, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);
+  }
+
+  @Override
+  public void createEphemeral(String path) {
+    create(path, null, CreateMode.EPHEMERAL);
+  }
+
+  @Override
+  public void createEphemeral(String path, String sessionId) {
+    createEphemeral(path, null, sessionId);
+  }
+
+  @Override
+  public void createEphemeral(String path, List<ACL> acl) {
+    create(path, null, acl, CreateMode.EPHEMERAL);
+  }
+
+  @Override
+  public void createEphemeral(String path, List<ACL> acl, String sessionId) {
+    create(path, null, acl, CreateMode.EPHEMERAL, sessionId);
+  }
+
+  @Override
+  public String create(String path, Object data, CreateMode mode) {
+    return create(path, data, mode);
+  }
+
+  @Override
+  public String create(String path, Object datat, List<ACL> acl, CreateMode 
mode) {
+    return create(path, datat, acl, mode, null);
+  }
+
+  @Override
+  public void createEphemeral(String path, Object data) {
+    create(path, data, CreateMode.EPHEMERAL);
+  }
+
+  @Override
+  public void createEphemeral(String path, Object data, String sessionId) {
+    create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, 
sessionId);
+  }
+
+  @Override
+  public void createEphemeral(String path, Object data, List<ACL> acl) {
+    create(path, data, acl, CreateMode.EPHEMERAL);
+  }
+
+  @Override
+  public void createEphemeral(String path, Object data, List<ACL> acl, String 
sessionId) {
+    create(path, data, acl, CreateMode.EPHEMERAL, sessionId);
+  }
+
+  @Override
+  public String createEphemeralSequential(String path, Object data) {
+    return create(path, data, CreateMode.EPHEMERAL_SEQUENTIAL);
+  }
+
+  @Override
+  public String createEphemeralSequential(String path, Object data, List<ACL> 
acl) {
+    return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL);
+  }
+
+  @Override
+  public String createEphemeralSequential(String path, Object data, String 
sessionId) {
+    return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.EPHEMERAL_SEQUENTIAL,
+        sessionId);
+  }
+
+  @Override
+  public String createEphemeralSequential(String path, Object data, List<ACL> 
acl,
+      String sessionId) {
+    return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL, sessionId);
+  }
+
+  @Override
+  public List<String> getChildren(String path) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return _rawZkClient.getChildren(path);
+  }
+
+  @Override
+  public int countChildren(String path) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return countChildren(path);
+  }
+
+  @Override
+  public boolean exists(String path) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return _rawZkClient.exists(path);
+  }
+
+  @Override
+  public Stat getStat(String path) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return _rawZkClient.getStat(path);
+  }
+
+  @Override
+  public boolean waitUntilExists(String path, TimeUnit timeUnit, long time) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return _rawZkClient.waitUntilExists(path, timeUnit, time);
+  }
+
+  @Override
+  public void deleteRecursively(String path) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _rawZkClient.deleteRecursively(path);
+  }
+
+  @Override
+  public boolean delete(String path) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return _rawZkClient.delete(path);
+  }
+
+  @Override
+  public <T> T readData(String path) {
+    return readData(path, false);
+  }
+
+  @Override
+  public <T> T readData(String path, boolean returnNullIfPathNotExists) {
+    T data = null;
+    try {
+      return readData(path, null);
+    } catch (ZkNoNodeException e) {
+      if (!returnNullIfPathNotExists) {
+        throw e;
+      }
+    }
+    return data;
+  }
+
+  @Override
+  public <T> T readData(String path, Stat stat) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return _rawZkClient.readData(path, stat);
+  }
+
+  @Override
+  public <T> T readData(String path, Stat stat, boolean watch) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return _rawZkClient.readData(path, stat, watch);
+  }
+
+  @Override
+  public <T> T readDataAndStat(String path, Stat stat, boolean 
returnNullIfPathNotExists) {
+    T data = null;
+    try {
+      data = readData(path, stat);
+    } catch (ZkNoNodeException e) {
+      if (!returnNullIfPathNotExists) {
+        throw e;
+      }
+    }
+    return data;
+  }
+
+  @Override
+  public void writeData(String path, Object object) {
+    writeData(path, object, -1);
+  }
+
+  @Override
+  public <T> void updateDataSerialized(String path, DataUpdater<T> updater) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _rawZkClient.updateDataSerialized(path, updater);
+  }
+
+  @Override
+  public void writeData(String path, Object datat, int expectedVersion) {
+    writeDataReturnStat(path, datat, expectedVersion);
+  }
+
+  @Override
+  public Stat writeDataReturnStat(String path, Object datat, int 
expectedVersion) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    return _rawZkClient.writeDataReturnStat(path, datat, expectedVersion);
+  }
+
+  @Override
+  public Stat writeDataGetStat(String path, Object datat, int expectedVersion) 
{
+    return writeDataReturnStat(path, datat, expectedVersion);
+  }
+
+  @Override
+  public void asyncCreate(String path, Object datat, CreateMode mode,
+      ZkAsyncCallbacks.CreateCallbackHandler cb) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _rawZkClient.asyncCreate(path, datat, mode, cb);
+  }
+
+  @Override
+  public void asyncSetData(String path, Object datat, int version,
+      ZkAsyncCallbacks.SetDataCallbackHandler cb) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _rawZkClient.asyncSetData(path, datat, version, cb);
+  }
+
+  @Override
+  public void asyncGetData(String path, 
ZkAsyncCallbacks.GetDataCallbackHandler cb) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _rawZkClient.asyncGetData(path, cb);
+  }
+
+  @Override
+  public void asyncExists(String path, ZkAsyncCallbacks.ExistsCallbackHandler 
cb) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _rawZkClient.asyncExists(path, cb);
+  }
+
+  @Override
+  public void asyncDelete(String path, ZkAsyncCallbacks.DeleteCallbackHandler 
cb) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _rawZkClient.asyncDelete(path, cb);
+  }
+
+  @Override
+  public void watchForData(String path) {
+    if (!checkIfPathBelongsToZkRealm(path)) {
+      throw new IllegalArgumentException(
+          "The given path does not map to the ZK realm for this 
DedicatedZkClient! Path: " + path
+              + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
 
 Review comment:
   It seems duplicate code. We can also put the exception throwing into 
`checkIfPathBelongsToZkRealm` or `validateZkRealmPath()` so we don't have to 
have the duplicate exception throwing in each method.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org

Reply via email to