pkuwm commented on a change in pull request #789: Add FederatedZkClient
URL: https://github.com/apache/helix/pull/789#discussion_r382951622
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
##########
@@ -20,344 +20,486 @@
*/
import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
-import org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
+import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implements and supports all ZK operations defined in interface {@link
RealmAwareZkClient},
+ * except for session-aware operations such as creating ephemeral nodes, for
which
+ * an {@link UnsupportedOperationException} will be thrown.
+ * <p>
+ * It acts as a single ZK client but will automatically route
read/write/change subscription
+ * requests to the corresponding ZkClient with the help of metadata store
directory service.
+ * It could connect to multiple ZK addresses and maintain a {@link ZkClient}
for each ZK address.
+ */
+public class FederatedZkClient implements RealmAwareZkClient {
+ private static final Logger LOG =
LoggerFactory.getLogger(FederatedZkClient.class);
+ private static final String FEDERATED_ZK_CLIENT =
FederatedZkClient.class.getSimpleName();
+ private static final String DEDICATED_ZK_CLIENT_FACTORY =
+ DedicatedZkClientFactory.class.getSimpleName();
+
+ private final MetadataStoreRoutingData _metadataStoreRoutingData;
+ private final RealmAwareZkClient.RealmAwareZkClientConfig _clientConfig;
+
+ // ZK realm -> ZkClient
+ private final Map<String, ZkClient> _zkRealmToZkClientMap;
+
+ private volatile boolean _isClosed;
+ private PathBasedZkSerializer _pathBasedZkSerializer;
+
+ // TODO: support capacity of ZkClient number in one FederatedZkClient and do
garbage collection.
+ public FederatedZkClient(RealmAwareZkClient.RealmAwareZkClientConfig
clientConfig,
+ MetadataStoreRoutingData metadataStoreRoutingData) {
+ if (metadataStoreRoutingData == null) {
+ throw new IllegalArgumentException("MetadataStoreRoutingData cannot be
null!");
+ }
+ if (clientConfig == null) {
+ throw new IllegalArgumentException("Client config cannot be null!");
+ }
+
+ _isClosed = false;
+ _clientConfig = clientConfig;
+ _pathBasedZkSerializer = clientConfig.getZkSerializer();
+ _metadataStoreRoutingData = metadataStoreRoutingData;
+ _zkRealmToZkClientMap = new ConcurrentHashMap<>();
+ }
-public class FederatedZkClient implements RealmAwareZkClient {
@Override
public List<String> subscribeChildChanges(String path, IZkChildListener
listener) {
- return null;
+ return getZkClient(path).subscribeChildChanges(path, listener);
}
@Override
public void unsubscribeChildChanges(String path, IZkChildListener listener) {
-
+ getZkClient(path).unsubscribeChildChanges(path, listener);
}
@Override
public void subscribeDataChanges(String path, IZkDataListener listener) {
-
+ getZkClient(path).subscribeDataChanges(path, listener);
}
@Override
public void unsubscribeDataChanges(String path, IZkDataListener listener) {
-
+ getZkClient(path).unsubscribeDataChanges(path, listener);
}
@Override
public void subscribeStateChanges(IZkStateListener listener) {
-
+ throwUnsupportedOperationException();
}
@Override
public void unsubscribeStateChanges(IZkStateListener listener) {
+ throwUnsupportedOperationException();
+ }
+ @Override
+ public void subscribeStateChanges(
+ org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener
listener) {
+ throwUnsupportedOperationException();
}
@Override
- public void unsubscribeAll() {
+ public void unsubscribeStateChanges(
+ org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener
listener) {
+ throwUnsupportedOperationException();
+ }
+ @Override
+ public void unsubscribeAll() {
+ _zkRealmToZkClientMap.values().forEach(ZkClient::unsubscribeAll);
}
@Override
public void createPersistent(String path) {
-
+ createPersistent(path, false);
}
@Override
public void createPersistent(String path, boolean createParents) {
-
+ createPersistent(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE);
}
@Override
public void createPersistent(String path, boolean createParents, List<ACL>
acl) {
-
+ getZkClient(path).createPersistent(path, createParents, acl);
}
@Override
public void createPersistent(String path, Object data) {
-
+ create(path, data, CreateMode.PERSISTENT);
}
@Override
public void createPersistent(String path, Object data, List<ACL> acl) {
-
+ create(path, data, acl, CreateMode.PERSISTENT);
}
@Override
public String createPersistentSequential(String path, Object data) {
- return null;
+ return create(path, data, CreateMode.PERSISTENT_SEQUENTIAL);
}
@Override
public String createPersistentSequential(String path, Object data, List<ACL>
acl) {
- return null;
+ return create(path, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);
}
@Override
public void createEphemeral(String path) {
-
+ create(path, null, CreateMode.EPHEMERAL);
}
@Override
public void createEphemeral(String path, String sessionId) {
-
+ createEphemeral(path, null, sessionId);
}
@Override
public void createEphemeral(String path, List<ACL> acl) {
-
+ create(path, null, acl, CreateMode.EPHEMERAL);
}
@Override
public void createEphemeral(String path, List<ACL> acl, String sessionId) {
-
+ create(path, null, acl, CreateMode.EPHEMERAL, sessionId);
}
@Override
public String create(String path, Object data, CreateMode mode) {
- return null;
+ return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
}
@Override
- public String create(String path, Object datat, List<ACL> acl, CreateMode
mode) {
- return null;
+ public String create(String path, Object data, List<ACL> acl, CreateMode
mode) {
+ return create(path, data, acl, mode, null);
}
@Override
public void createEphemeral(String path, Object data) {
-
+ create(path, data, CreateMode.EPHEMERAL);
}
@Override
public void createEphemeral(String path, Object data, String sessionId) {
-
+ create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
sessionId);
}
@Override
public void createEphemeral(String path, Object data, List<ACL> acl) {
-
+ create(path, data, acl, CreateMode.EPHEMERAL);
}
@Override
public void createEphemeral(String path, Object data, List<ACL> acl, String
sessionId) {
-
+ create(path, data, acl, CreateMode.EPHEMERAL, sessionId);
}
@Override
public String createEphemeralSequential(String path, Object data) {
- return null;
+ return create(path, data, CreateMode.EPHEMERAL_SEQUENTIAL);
}
@Override
public String createEphemeralSequential(String path, Object data, List<ACL>
acl) {
- return null;
+ return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL);
}
@Override
public String createEphemeralSequential(String path, Object data, String
sessionId) {
- return null;
+ return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL,
+ sessionId);
}
@Override
public String createEphemeralSequential(String path, Object data, List<ACL>
acl,
String sessionId) {
- return null;
+ return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL, sessionId);
}
@Override
public List<String> getChildren(String path) {
- return null;
+ return getZkClient(path).getChildren(path);
}
@Override
public int countChildren(String path) {
- return 0;
+ return getZkClient(path).countChildren(path);
}
@Override
public boolean exists(String path) {
- return false;
+ return getZkClient(path).exists(path);
}
@Override
public Stat getStat(String path) {
- return null;
+ return getZkClient(path).getStat(path);
}
@Override
public boolean waitUntilExists(String path, TimeUnit timeUnit, long time) {
- return false;
+ return getZkClient(path).waitUntilExists(path, timeUnit, time);
}
@Override
public void deleteRecursively(String path) {
-
+ getZkClient(path).deleteRecursively(path);
}
@Override
public boolean delete(String path) {
- return false;
+ return getZkClient(path).delete(path);
}
@Override
+ @SuppressWarnings("unchecked")
public <T> T readData(String path) {
- return null;
+ return (T) readData(path, false);
}
@Override
public <T> T readData(String path, boolean returnNullIfPathNotExists) {
- return null;
+ return getZkClient(path).readData(path, returnNullIfPathNotExists);
}
@Override
public <T> T readData(String path, Stat stat) {
- return null;
+ return getZkClient(path).readData(path, stat);
}
@Override
public <T> T readData(String path, Stat stat, boolean watch) {
- return null;
+ return getZkClient(path).readData(path, stat, watch);
}
@Override
public <T> T readDataAndStat(String path, Stat stat, boolean
returnNullIfPathNotExists) {
- return null;
+ return getZkClient(path).readData(path, stat, returnNullIfPathNotExists);
}
@Override
public void writeData(String path, Object object) {
-
+ writeData(path, object, -1);
}
@Override
public <T> void updateDataSerialized(String path, DataUpdater<T> updater) {
-
+ getZkClient(path).updateDataSerialized(path, updater);
}
@Override
- public void writeData(String path, Object datat, int expectedVersion) {
-
+ public void writeData(String path, Object data, int expectedVersion) {
+ writeDataReturnStat(path, data, expectedVersion);
}
@Override
- public Stat writeDataReturnStat(String path, Object datat, int
expectedVersion) {
- return null;
+ public Stat writeDataReturnStat(String path, Object data, int
expectedVersion) {
+ return getZkClient(path).writeDataReturnStat(path, data, expectedVersion);
}
@Override
- public Stat writeDataGetStat(String path, Object datat, int expectedVersion)
{
- return null;
+ public Stat writeDataGetStat(String path, Object data, int expectedVersion) {
+ return writeDataReturnStat(path, data, expectedVersion);
}
@Override
- public void asyncCreate(String path, Object datat, CreateMode mode,
+ public void asyncCreate(String path, Object data, CreateMode mode,
ZkAsyncCallbacks.CreateCallbackHandler cb) {
-
+ getZkClient(path).asyncCreate(path, data, mode, cb);
}
@Override
- public void asyncSetData(String path, Object datat, int version,
+ public void asyncSetData(String path, Object data, int version,
ZkAsyncCallbacks.SetDataCallbackHandler cb) {
-
+ getZkClient(path).asyncSetData(path, data, version, cb);
}
@Override
public void asyncGetData(String path,
ZkAsyncCallbacks.GetDataCallbackHandler cb) {
-
+ getZkClient(path).asyncGetData(path, cb);
}
@Override
public void asyncExists(String path, ZkAsyncCallbacks.ExistsCallbackHandler
cb) {
-
+ getZkClient(path).asyncExists(path, cb);
}
@Override
public void asyncDelete(String path, ZkAsyncCallbacks.DeleteCallbackHandler
cb) {
-
+ getZkClient(path).asyncDelete(path, cb);
}
@Override
public void watchForData(String path) {
-
+ getZkClient(path).watchForData(path);
}
@Override
public List<String> watchForChilds(String path) {
- return null;
+ return getZkClient(path).watchForChilds(path);
}
@Override
public long getCreationTime(String path) {
- return 0;
+ return getZkClient(path).getCreationTime(path);
}
@Override
public List<OpResult> multi(Iterable<Op> ops) {
+ throwUnsupportedOperationException();
return null;
}
@Override
public boolean waitUntilConnected(long time, TimeUnit timeUnit) {
+ throwUnsupportedOperationException();
return false;
}
@Override
public String getServers() {
+ throwUnsupportedOperationException();
return null;
}
@Override
public long getSessionId() {
- return 0;
+ // Session-aware is unsupported.
+ throwUnsupportedOperationException();
+ return 0L;
}
@Override
public void close() {
+ if (isClosed()) {
+ return;
+ }
+ LOG.info("Closing {}.", FEDERATED_ZK_CLIENT);
+
+ synchronized (_zkRealmToZkClientMap) {
+ _zkRealmToZkClientMap.values().forEach(ZkClient::close);
+ _zkRealmToZkClientMap.clear();
+ _isClosed = true;
+ }
}
@Override
public boolean isClosed() {
- return false;
+ return _isClosed;
}
@Override
public byte[] serialize(Object data, String path) {
- return new byte[0];
+ return getZkClient(path).serialize(data, path);
}
@Override
public <T> T deserialize(byte[] data, String path) {
- return null;
+ return getZkClient(path).deserialize(data, path);
}
@Override
public void setZkSerializer(ZkSerializer zkSerializer) {
-
+ _pathBasedZkSerializer = new BasicZkSerializer(zkSerializer);
+ _zkRealmToZkClientMap.values()
+ .forEach(zkClient -> zkClient.setZkSerializer(_pathBasedZkSerializer));
}
@Override
public void setZkSerializer(PathBasedZkSerializer zkSerializer) {
-
+ _pathBasedZkSerializer = zkSerializer;
+ _zkRealmToZkClientMap.values().forEach(zkClient ->
zkClient.setZkSerializer(zkSerializer));
}
@Override
public PathBasedZkSerializer getZkSerializer() {
- return null;
+ return _pathBasedZkSerializer;
+ }
+
+ private String create(final String path, final Object dataObject, final
List<ACL> acl,
+ final CreateMode mode, final String expectedSessionId) {
+ if (mode.isEphemeral()) {
+ throwUnsupportedOperationException();
+ }
+
+ // Create mode is not session-aware, so the node does not have to be
created
+ // by the expectedSessionId.
+ return getZkClient(path).create(path, dataObject, acl, mode);
+ }
+
+ private ZkClient getZkClient(String path) {
+ // If FederatedZkClient is closed, should not return ZkClient.
+ if (isClosed()) {
+ throw new IllegalStateException(FEDERATED_ZK_CLIENT + " is closed!");
+ }
+
+ String zkRealm = getZkRealm(path);
+ if (!_zkRealmToZkClientMap.containsKey(zkRealm)) {
+ // Synchronized to avoid creating duplicate ZkClient for the same
ZkRealm.
+ synchronized (_zkRealmToZkClientMap) {
+ if (!_zkRealmToZkClientMap.containsKey(zkRealm)) {
+ _zkRealmToZkClientMap.put(zkRealm, createZkClient(zkRealm));
+ }
+ }
+ }
+
+ return _zkRealmToZkClientMap.get(zkRealm);
+ }
+
+ private String getZkRealm(String path) {
+ String zkRealm;
+ try {
+ zkRealm = _metadataStoreRoutingData.getMetadataStoreRealm(path);
+ } catch (NoSuchElementException ex) {
+ throw new NoSuchElementException("Cannot find ZK realm for the path: " +
path);
+ }
+
+ if (zkRealm == null || zkRealm.isEmpty()) {
+ throw new NoSuchElementException("Cannot find ZK realm for the path: " +
path);
+ }
+
+ return zkRealm;
+ }
+
+ private ZkClient createZkClient(String zkAddress) {
+ LOG.debug("Creating ZkClient for realm: {}.", zkAddress);
+ return new ZkClient(new ZkConnection(zkAddress), (int)
_clientConfig.getConnectInitTimeout(),
+ _clientConfig.getOperationRetryTimeout(), _pathBasedZkSerializer,
+ _clientConfig.getMonitorType(), _clientConfig.getMonitorKey(),
+ _clientConfig.getMonitorInstanceName(),
_clientConfig.isMonitorRootPathOnly());
+ }
+
+ private void throwUnsupportedOperationException() {
+ throw new UnsupportedOperationException(
+ "Session-aware operation is not supported by " + FEDERATED_ZK_CLIENT
+ + ". Instead, please use " + DEDICATED_ZK_CLIENT_FACTORY + " for
this operation");
Review comment:
`DedicatedZkClientFactory` is what we provide and recommend users to create
a RealmAwareZkClient which is a `DedicatedZkClient` instance.
`DedicatedZkClient ` is protected and we don't want users to directly use it,
right?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]