desaikomal commented on code in PR #2409:
URL: https://github.com/apache/helix/pull/2409#discussion_r1143769786


##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -47,33 +52,41 @@
 import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil;
 import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
 import org.apache.helix.zookeeper.zkclient.ZkConnection;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static 
org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.convertZkEntryModeToMetaClientEntryMode;
 import static 
org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;
 
-public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
+
+public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable, 
IZkStateListener {
   private static final Logger LOG = 
LoggerFactory.getLogger(ZkMetaClient.class);
   private final ZkClient _zkClient;
   private final long _initConnectionTimeout;
   private final long _reconnectTimeout;
 
+  private final ScheduledExecutorService _zkClientReconnectMonitor;
+  private ScheduledFuture<?> _reconnectMonitorFuture;
+  private ReentrantLock _zkClientConnectionMutex = new ReentrantLock();
+
   public ZkMetaClient(ZkMetaClientConfig config) {
-    _initConnectionTimeout = config.getConnectionInitTimeoutInMillis();
+      _initConnectionTimeout = config.getConnectionInitTimeoutInMillis();
     _reconnectTimeout = 
config.getMetaClientReconnectPolicy().getAutoReconnectTimeout();
     // TODO: Right new ZkClient reconnect using exp backoff with fixed max 
backoff interval. We should
-    // 1. Allow user to config max backoff interval (next PR)
-    // 2. Allow user to config reconnect policy (future PR)
+    // 1. Allow user to config max and init backoff interval

Review Comment:
   too many knobs are not always good. unless we have strong requirements from 
API compatibility



##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -47,33 +52,41 @@
 import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil;
 import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
 import org.apache.helix.zookeeper.zkclient.ZkConnection;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static 
org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.convertZkEntryModeToMetaClientEntryMode;
 import static 
org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;
 
-public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
+
+public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable, 
IZkStateListener {
   private static final Logger LOG = 
LoggerFactory.getLogger(ZkMetaClient.class);
   private final ZkClient _zkClient;
   private final long _initConnectionTimeout;
   private final long _reconnectTimeout;
 
+  private final ScheduledExecutorService _zkClientReconnectMonitor;
+  private ScheduledFuture<?> _reconnectMonitorFuture;
+  private ReentrantLock _zkClientConnectionMutex = new ReentrantLock();
+
   public ZkMetaClient(ZkMetaClientConfig config) {
-    _initConnectionTimeout = config.getConnectionInitTimeoutInMillis();
+      _initConnectionTimeout = config.getConnectionInitTimeoutInMillis();

Review Comment:
   nit: does this have extra space?



##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -47,33 +52,41 @@
 import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil;
 import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
 import org.apache.helix.zookeeper.zkclient.ZkConnection;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static 
org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.convertZkEntryModeToMetaClientEntryMode;
 import static 
org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;
 
-public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
+
+public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable, 
IZkStateListener {
   private static final Logger LOG = 
LoggerFactory.getLogger(ZkMetaClient.class);
   private final ZkClient _zkClient;
   private final long _initConnectionTimeout;
   private final long _reconnectTimeout;
 
+  private final ScheduledExecutorService _zkClientReconnectMonitor;

Review Comment:
   nit: suggestion - if adding comments is allowed, can you add comment on what 
is the purpose of this Executor service??  not sure, what is coding guidelines



##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -266,18 +279,25 @@ public void asyncSet(String key, T data, int version, 
AsyncCallback.StatCallback
 
   @Override
   public void connect() {
-    // TODO: throws IllegalStateException when already connected
     try {
+      _zkClientConnectionMutex.lock();
       _zkClient.connect(_initConnectionTimeout, _zkClient);
+      // register this client as state change listener to react to ZkClient 
EXPIRED event.
+      // When ZkClient has expired connection to ZK, it sill auto reconnect 
until ZkClient

Review Comment:
   nit: sill -> still



##########
meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java:
##########
@@ -0,0 +1,150 @@
+package org.apache.helix.metaclient.impl.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.metaclient.api.ConnectStateChangeListener;
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.policy.ExponentialBackoffReconnectPolicy;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
+import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import static 
org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS;
+import static 
org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS;
+
+
+public class TestConnectStateChangeListenerAndRetry  {
+  protected static final String ZK_ADDR = "localhost:2181";
+  protected static ZkServer _zkServer;
+
+  private static final long AUTO_RECONNECT_TIMEOUT_MS_FOR_TEST = 3 * 1000;
+  private static final long AUTO_RECONNECT_WAIT_TIME_WITHIN = 1 * 1000;
+  private static final long AUTO_RECONNECT_WAIT_TIME_EXD = 5 * 1000;
+
+  /**
+   * Simulate a zk state change by calling {@link 
ZkClient#process(WatchedEvent)} directly
+   */
+  private static void simulateZkStateReconnected(ZkClient zkClient) throws 
InterruptedException {
+      WatchedEvent event =
+          new WatchedEvent(Watcher.Event.EventType.None, 
Watcher.Event.KeeperState.Disconnected,
+              null);
+      zkClient.process(event);
+
+      Thread.sleep(AUTO_RECONNECT_WAIT_TIME_WITHIN);
+
+      event = new WatchedEvent(Watcher.Event.EventType.None, 
Watcher.Event.KeeperState.SyncConnected,
+          null);
+      zkClient.process(event);
+  }
+
+  @BeforeSuite
+  public void prepare() {
+    // start local zookeeper server
+    _zkServer = ZkMetaClientTestBase.startZkServer(ZK_ADDR);
+  }
+
+  @AfterSuite
+  public void cleanUp() {
+
+  }
+
+  @Test
+  public void testConnectState() {
+    try (ZkMetaClient<String> zkMetaClient = 
createZkMetaClientReconnectTest()) {
+      zkMetaClient.connect();
+      zkMetaClient.connect();
+      Assert.fail("The second connect should throw IllegalStateException");
+    } catch (Exception ex) {
+      Assert.assertTrue(ex instanceof IllegalStateException);
+    }
+  }
+
+  // test mock zkclient event
+  @Test(dependsOnMethods = "testConnectState")
+  public void testReConnectSucceed() throws InterruptedException {
+    try (ZkMetaClient<String> zkMetaClient = 
createZkMetaClientReconnectTest()) {
+      zkMetaClient.connect();
+      simulateZkStateReconnected(zkMetaClient.getZkClient());
+      Thread.sleep(AUTO_RECONNECT_WAIT_TIME_EXD);
+      // When ZK reconnect happens within timeout window, zkMetaClient should 
ba able to perform CRUD.
+      
Assert.assertTrue(zkMetaClient.getZkClient().getConnection().getZookeeperState().isConnected());

Review Comment:
   can you test the CRUD operation as you are saying should work?



##########
meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java:
##########
@@ -0,0 +1,150 @@
+package org.apache.helix.metaclient.impl.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.metaclient.api.ConnectStateChangeListener;
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.policy.ExponentialBackoffReconnectPolicy;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
+import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import static 
org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS;
+import static 
org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS;
+
+
+public class TestConnectStateChangeListenerAndRetry  {
+  protected static final String ZK_ADDR = "localhost:2181";
+  protected static ZkServer _zkServer;
+
+  private static final long AUTO_RECONNECT_TIMEOUT_MS_FOR_TEST = 3 * 1000;
+  private static final long AUTO_RECONNECT_WAIT_TIME_WITHIN = 1 * 1000;
+  private static final long AUTO_RECONNECT_WAIT_TIME_EXD = 5 * 1000;
+
+  /**
+   * Simulate a zk state change by calling {@link 
ZkClient#process(WatchedEvent)} directly
+   */
+  private static void simulateZkStateReconnected(ZkClient zkClient) throws 
InterruptedException {
+      WatchedEvent event =
+          new WatchedEvent(Watcher.Event.EventType.None, 
Watcher.Event.KeeperState.Disconnected,
+              null);
+      zkClient.process(event);
+
+      Thread.sleep(AUTO_RECONNECT_WAIT_TIME_WITHIN);
+
+      event = new WatchedEvent(Watcher.Event.EventType.None, 
Watcher.Event.KeeperState.SyncConnected,
+          null);
+      zkClient.process(event);
+  }
+
+  @BeforeSuite
+  public void prepare() {
+    // start local zookeeper server
+    _zkServer = ZkMetaClientTestBase.startZkServer(ZK_ADDR);
+  }
+
+  @AfterSuite
+  public void cleanUp() {
+
+  }
+
+  @Test
+  public void testConnectState() {
+    try (ZkMetaClient<String> zkMetaClient = 
createZkMetaClientReconnectTest()) {
+      zkMetaClient.connect();
+      zkMetaClient.connect();

Review Comment:
   should it say, already connected ???



##########
meta-client/src/main/java/org/apache/helix/metaclient/policy/ExponentialBackoffReconnectPolicy.java:
##########
@@ -34,20 +35,42 @@ public class ExponentialBackoffReconnectPolicy implements 
MetaClientReconnectPol
 
   private final long _maxBackOffInterval;
   private final long _initBackoffInterval;
+  private final long _autoReconnectTimeout;
 
   @Override
   public RetryPolicyName getPolicyName() {
     return RetryPolicyName.EXP_BACKOFF;
   }
 
+  @Override
+  public long getAutoReconnectTimeout() {
+    return _autoReconnectTimeout;
+  }
+
   public ExponentialBackoffReconnectPolicy() {
     _initBackoffInterval = DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS;
     _maxBackOffInterval = DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS;
+    _autoReconnectTimeout = DEFAULT_AUTO_RECONNECT_TIMEOUT_MS;
+  }
+
+  public ExponentialBackoffReconnectPolicy(long autoReconnectTimeout) {
+    _initBackoffInterval = DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS;
+    _maxBackOffInterval = DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS;
+    _autoReconnectTimeout = autoReconnectTimeout;
   }
 
-  public ExponentialBackoffReconnectPolicy(long maxBackOffInterval, long 
initBackoffInterval) {
+  // TODO: Allow user to pass maxBackOffInterval and initBackoffInterval.

Review Comment:
   if we don't use it, then pleae remove it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to