narendly commented on a change in pull request #796: WIP: Add
SharedZkClient/InnerSharedZkClient implementation
URL: https://github.com/apache/helix/pull/796#discussion_r383646864
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
##########
@@ -37,79 +51,467 @@
* HelixZkClient that uses shared ZkConnection.
* A SharedZkClient won't manipulate the shared ZkConnection directly.
*/
-public class SharedZkClient extends ZkClient implements HelixZkClient {
+public class SharedZkClient implements RealmAwareZkClient {
private static Logger LOG = LoggerFactory.getLogger(SharedZkClient.class);
- /*
- * Since we cannot really disconnect the ZkConnection, we need a dummy
ZkConnection placeholder.
- * This is to ensure connection field is never null even the shared
RealmAwareZkClient instance is closed so as to avoid NPE.
- */
- private final static ZkConnection IDLE_CONNECTION = new
ZkConnection("Dummy_ZkServers");
- private final OnCloseCallback _onCloseCallback;
- private final ZkConnectionManager _connectionManager;
-
- public interface OnCloseCallback {
- /**
- * Triggered after the RealmAwareZkClient is closed.
- */
- void onClose();
- }
-
- /**
- * Construct a shared RealmAwareZkClient that uses a shared ZkConnection.
- *
- * @param connectionManager The manager of the shared ZkConnection.
- * @param clientConfig ZkClientConfig details to create the shared
RealmAwareZkClient.
- * @param callback Clean up logic when the shared
RealmAwareZkClient is closed.
- */
- public SharedZkClient(ZkConnectionManager connectionManager, ZkClientConfig
clientConfig,
- OnCloseCallback callback) {
- super(connectionManager.getConnection(), 0,
clientConfig.getOperationRetryTimeout(),
- clientConfig.getZkSerializer(), clientConfig.getMonitorType(),
clientConfig.getMonitorKey(),
- clientConfig.getMonitorInstanceName(),
clientConfig.isMonitorRootPathOnly());
- _connectionManager = connectionManager;
- // Register to the base dedicated RealmAwareZkClient
- _connectionManager.registerWatcher(this);
- _onCloseCallback = callback;
+
+ private final HelixZkClient _innerSharedZkClient;
+ private final String _zkRealmShardingKey;
+ private final MetadataStoreRoutingData _metadataStoreRoutingData;
+ private final String _zkRealmAddress;
+
+ public SharedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig
connectionConfig,
+ RealmAwareZkClient.RealmAwareZkClientConfig clientConfig,
+ MetadataStoreRoutingData metadataStoreRoutingData) {
+
+ if (connectionConfig == null) {
+ throw new IllegalArgumentException("RealmAwareZkConnectionConfig cannot
be null!");
+ }
+ _zkRealmShardingKey = connectionConfig.getZkRealmShardingKey();
+
+ if (metadataStoreRoutingData == null) {
+ throw new IllegalArgumentException("MetadataStoreRoutingData cannot be
null!");
+ }
+ _metadataStoreRoutingData = metadataStoreRoutingData;
+
+ // TODO: use _zkRealmShardingKey to generate zkRealmAddress. This can done
the same way of pull 765 once @hunter check it in.
+ // Get the ZkRealm address based on the ZK path sharding key
+ String zkRealmAddress =
_metadataStoreRoutingData.getMetadataStoreRealm(_zkRealmShardingKey);
+ if (zkRealmAddress == null || zkRealmAddress.isEmpty()) {
+ throw new IllegalArgumentException(
+ "ZK realm address for the given ZK realm sharding key is invalid! ZK
realm address: "
+ + zkRealmAddress + " ZK realm sharding key: " +
_zkRealmShardingKey);
+ }
+ _zkRealmAddress = zkRealmAddress;
+
+ // Create an InnerSharedZkClient to actually serve ZK requests
+ // TODO: Rename HelixZkClient in the future or remove it entirely - this
will be a backward-compatibility breaking change because HelixZkClient is being
used by Helix users.
+
+ // Note, here delegate _innerSharedZkClient would share the same
connectionManager. Once the close() API of
+ // SharedZkClient is invoked, we can just call the close() API of delegate
_innerSharedZkClient. This would follow
+ // exactly the pattern of innerSharedZkClient closing logic, which would
close the connectionManager when the last
+ // sharedInnerZkClient is closed.
+ HelixZkClient.ZkConnectionConfig zkConnectionConfig =
+ new HelixZkClient.ZkConnectionConfig(zkRealmAddress)
+ .setSessionTimeout(connectionConfig.getSessionTimeout());
+ HelixZkClient.ZkClientConfig zkClientConfig = new
HelixZkClient.ZkClientConfig();
+ zkClientConfig.setZkSerializer(clientConfig.getZkSerializer())
+ .setConnectInitTimeout(clientConfig.getConnectInitTimeout())
+ .setOperationRetryTimeout(clientConfig.getOperationRetryTimeout())
+ .setMonitorInstanceName(clientConfig.getMonitorInstanceName())
+ .setMonitorKey(clientConfig.getMonitorKey())
+ .setMonitorType(clientConfig.getMonitorType())
+ .setMonitorRootPathOnly(clientConfig.isMonitorRootPathOnly());
+ _innerSharedZkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(zkConnectionConfig, zkClientConfig);
}
- @Override
- public void close() {
- super.close();
- if (isClosed()) {
- // Note that if register is not done while constructing, these private
fields may not be init yet.
- if (_connectionManager != null) {
- _connectionManager.unregisterWatcher(this);
- }
- if (_onCloseCallback != null) {
- _onCloseCallback.onClose();
+ private void checkPathAndThrow(String path) {
+ // TODO: replace with the singleton MetadataStoreRoutingData
+ try {
+ String zkRealmForPath =
_metadataStoreRoutingData.getMetadataStoreRealm(path);
+ if (!_zkRealmAddress.equals(zkRealmForPath)) {
+ throw new IllegalArgumentException("Given path: " + path + "'s ZK
realm: " + zkRealmForPath
+ + " does not match the ZK realm: " + _zkRealmAddress + " and
sharding key: "
+ + _zkRealmShardingKey + " for this DedicatedZkClient!");
}
+ } catch (NoSuchElementException e) {
+ throw new IllegalArgumentException(
+ "Given path: " + path + " does not have a valid sharding key!");
}
}
@Override
- public IZkConnection getConnection() {
- if (isClosed()) {
- return IDLE_CONNECTION;
- }
- return super.getConnection();
+ public List<String> subscribeChildChanges(String path, IZkChildListener
listener) {
+ checkPathAndThrow(path);
+ return _innerSharedZkClient.subscribeChildChanges(path, listener);
}
- /**
- * Since ZkConnection session is shared in this RealmAwareZkClient, do not
create ephemeral node using a SharedZKClient.
- */
@Override
- public String create(final String path, Object datat, final List<ACL> acl,
- final CreateMode mode) {
- if (mode.isEphemeral()) {
- throw new UnsupportedOperationException(
- "Create ephemeral nodes using a " +
SharedZkClient.class.getSimpleName()
- + " is not supported.");
- }
- return super.create(path, datat, acl, mode);
+ public void unsubscribeChildChanges(String path, IZkChildListener listener) {
+ checkPathAndThrow(path);
+ _innerSharedZkClient.unsubscribeChildChanges(path, listener);
+ }
+
+ @Override
+ public void subscribeDataChanges(String path, IZkDataListener listener) {
+ checkPathAndThrow(path);
+ _innerSharedZkClient.subscribeDataChanges(path, listener);
+ }
+
+ @Override
+ public void unsubscribeDataChanges(String path, IZkDataListener listener) {
+ checkPathAndThrow(path);
+ _innerSharedZkClient.unsubscribeDataChanges(path, listener);
+ }
+
+ @Override
+ public void subscribeStateChanges(IZkStateListener listener) {
+ _innerSharedZkClient.subscribeStateChanges(listener);
+ }
+
+ @Override
+ public void unsubscribeStateChanges(IZkStateListener listener) {
+ _innerSharedZkClient.unsubscribeStateChanges(listener);
+ }
+
+ @Override
+ public void unsubscribeAll() {
+ _innerSharedZkClient.unsubscribeAll();
+ }
+
+ @Override
+ public void createPersistent(String path) {
+ checkPathAndThrow(path);
+ _innerSharedZkClient.createPersistent(path);
+ }
+
+ @Override
+ public void createPersistent(String path, boolean createParents) {
+ checkPathAndThrow(path);
+ _innerSharedZkClient.createPersistent(path, createParents);
+ }
+
+ @Override
+ public void createPersistent(String path, boolean createParents, List<ACL>
acl) {
+ checkPathAndThrow(path);
+ _innerSharedZkClient.createPersistent(path, createParents, acl);
+ }
+
+ @Override
+ public void createPersistent(String path, Object data) {
+ checkPathAndThrow(path);
+ _innerSharedZkClient.createPersistent(path, data);
+ }
+
+ @Override
+ public void createPersistent(String path, Object data, List<ACL> acl) {
+ checkPathAndThrow(path);
+ _innerSharedZkClient.createPersistent(path, data, acl);
+ }
+
+ @Override
+ public String createPersistentSequential(String path, Object data) {
+ checkPathAndThrow(path);
+ return _innerSharedZkClient.createPersistentSequential(path, data);
+ }
+
+ @Override
+ public String createPersistentSequential(String path, Object data, List<ACL>
acl) {
+ checkPathAndThrow(path);
+ return _innerSharedZkClient.createPersistentSequential(path, data, acl);
}
@Override
- protected boolean isManagingZkConnection() {
- return false;
+ public void createEphemeral(String path) {
+ throw new UnsupportedOperationException(
+ "Create ephemeral nodes using " + SharedZkClient.class.getSimpleName()
Review comment:
Nit: "Creating ephemeral nodes..."
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]