qqu0127 commented on code in PR #2409:
URL: https://github.com/apache/helix/pull/2409#discussion_r1143968322
##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -266,18 +279,25 @@ public void asyncSet(String key, T data, int version,
AsyncCallback.StatCallback
@Override
public void connect() {
- // TODO: throws IllegalStateException when already connected
try {
+ _zkClientConnectionMutex.lock();
_zkClient.connect(_initConnectionTimeout, _zkClient);
+ // register this client as state change listener to react to ZkClient
EXPIRED event.
+ // When ZkClient has expired connection to ZK, it sill auto reconnect
until ZkClient
+ // is closed or connection re-established.
+ // We will need to close ZkClient when user set retry connection timeout.
+ _zkClient.subscribeStateChanges(this);
Review Comment:
This is somehow confusing. Can we create a standalone state change listener?
(a metaclient and listener are on two different level)
##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -394,4 +414,69 @@ public byte[] serialize(T data, String path) {
public T deserialize(byte[] bytes, String path) {
return _zkClient.deserialize(bytes, path);
}
+
+ /**
+ * A clean up method called when connect state change or MetaClient is
closing.
+ * @param cancel If we want to cancel the reconnect monitor thread.
+ * @param close If we want to close ZkClient.
+ */
+ private void cleanUpAndClose(boolean cancel, boolean close) {
+ _zkClientConnectionMutex.lock();
+
+ if (close && !_zkClient.isClosed()) {
+ _zkClient.close();
+ LOG.info("ZkClient is closed");
+ }
+
+ if (cancel && _reconnectMonitorFuture != null) {
+ _reconnectMonitorFuture.cancel(true);
+ LOG.info("ZkClient reconnect monitor thread is canceled");
+ }
+
+ _zkClientConnectionMutex.unlock();
+ }
+
+ // Schedule a monitor to track ZkClient auto reconnect when Disconnected
+ // Cancel the monitor thread when connected.
+ @Override
+ public void handleStateChanged(Watcher.Event.KeeperState state) throws
Exception {
+ if (state == Watcher.Event.KeeperState.Disconnected) {
+ // Expired. start a new event monitoring retry
+ _zkClientConnectionMutex.lockInterruptibly();
+ if (_reconnectMonitorFuture == null ||
_reconnectMonitorFuture.isCancelled()
+ || _reconnectMonitorFuture.isDone()) {
+ _reconnectMonitorFuture = _zkClientReconnectMonitor.schedule(() -> {
+ if (!_zkClient.getConnection().getZookeeperState().isConnected()) {
+ cleanUpAndClose(false, true);
+ }
+ }, _reconnectTimeout, TimeUnit.MILLISECONDS);
+ LOG.info("ZkClient is Disconnected, schedule a reconnect monitor after
{}",
+ _reconnectTimeout);
+ }
+ _zkClientConnectionMutex.unlock();
Review Comment:
Should we release the lock in finally block?
##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -47,33 +52,41 @@
import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil;
import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static
org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.convertZkEntryModeToMetaClientEntryMode;
import static
org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;
-public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
+
+public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable,
IZkStateListener {
private static final Logger LOG =
LoggerFactory.getLogger(ZkMetaClient.class);
private final ZkClient _zkClient;
private final long _initConnectionTimeout;
private final long _reconnectTimeout;
+ private final ScheduledExecutorService _zkClientReconnectMonitor;
+ private ScheduledFuture<?> _reconnectMonitorFuture;
+ private ReentrantLock _zkClientConnectionMutex = new ReentrantLock();
Review Comment:
Could you please add comments on what this lock is used for?
##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -394,4 +414,69 @@ public byte[] serialize(T data, String path) {
public T deserialize(byte[] bytes, String path) {
return _zkClient.deserialize(bytes, path);
}
+
+ /**
+ * A clean up method called when connect state change or MetaClient is
closing.
+ * @param cancel If we want to cancel the reconnect monitor thread.
+ * @param close If we want to close ZkClient.
+ */
+ private void cleanUpAndClose(boolean cancel, boolean close) {
+ _zkClientConnectionMutex.lock();
+
+ if (close && !_zkClient.isClosed()) {
+ _zkClient.close();
+ LOG.info("ZkClient is closed");
+ }
+
+ if (cancel && _reconnectMonitorFuture != null) {
+ _reconnectMonitorFuture.cancel(true);
+ LOG.info("ZkClient reconnect monitor thread is canceled");
+ }
+
+ _zkClientConnectionMutex.unlock();
Review Comment:
We should make sure the lock is always unlocked
##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -394,4 +414,69 @@ public byte[] serialize(T data, String path) {
public T deserialize(byte[] bytes, String path) {
return _zkClient.deserialize(bytes, path);
}
+
+ /**
+ * A clean up method called when connect state change or MetaClient is
closing.
+ * @param cancel If we want to cancel the reconnect monitor thread.
+ * @param close If we want to close ZkClient.
+ */
+ private void cleanUpAndClose(boolean cancel, boolean close) {
+ _zkClientConnectionMutex.lock();
+
+ if (close && !_zkClient.isClosed()) {
+ _zkClient.close();
+ LOG.info("ZkClient is closed");
+ }
+
+ if (cancel && _reconnectMonitorFuture != null) {
+ _reconnectMonitorFuture.cancel(true);
+ LOG.info("ZkClient reconnect monitor thread is canceled");
+ }
+
+ _zkClientConnectionMutex.unlock();
+ }
+
+ // Schedule a monitor to track ZkClient auto reconnect when Disconnected
+ // Cancel the monitor thread when connected.
+ @Override
+ public void handleStateChanged(Watcher.Event.KeeperState state) throws
Exception {
+ if (state == Watcher.Event.KeeperState.Disconnected) {
+ // Expired. start a new event monitoring retry
+ _zkClientConnectionMutex.lockInterruptibly();
+ if (_reconnectMonitorFuture == null ||
_reconnectMonitorFuture.isCancelled()
+ || _reconnectMonitorFuture.isDone()) {
+ _reconnectMonitorFuture = _zkClientReconnectMonitor.schedule(() -> {
+ if (!_zkClient.getConnection().getZookeeperState().isConnected()) {
+ cleanUpAndClose(false, true);
+ }
+ }, _reconnectTimeout, TimeUnit.MILLISECONDS);
+ LOG.info("ZkClient is Disconnected, schedule a reconnect monitor after
{}",
+ _reconnectTimeout);
+ }
Review Comment:
Can this be simplified to check the state and `_reconnectMonitorFuture`
condition together, then grab the lock only if necessary?
##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -47,33 +52,41 @@
import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil;
import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static
org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.convertZkEntryModeToMetaClientEntryMode;
import static
org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;
-public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
+
+public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable,
IZkStateListener {
private static final Logger LOG =
LoggerFactory.getLogger(ZkMetaClient.class);
private final ZkClient _zkClient;
private final long _initConnectionTimeout;
private final long _reconnectTimeout;
+ private final ScheduledExecutorService _zkClientReconnectMonitor;
+ private ScheduledFuture<?> _reconnectMonitorFuture;
+ private ReentrantLock _zkClientConnectionMutex = new ReentrantLock();
Review Comment:
Also what's its relationship with `getEventLock()._stateChangedCondition` in
native zkclient?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]