pkuwm commented on a change in pull request #1119:
URL: https://github.com/apache/helix/pull/1119#discussion_r459267153



##########
File path: 
zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
##########
@@ -553,6 +554,52 @@ public void testCreateEphemeralWithValidSession()
     _zkClient.delete(path);
   }
 
+  /*
+   * This test validates that when ZK_AUTOSYNC_ENABLED_DEFAULT is enabled, 
sync() would be issued
+   * before handleNewSession. ZKclient would not see stale data.
+   */
+  @Test
+  public void testAutoSyncWithNewSessionEstablishment() throws Exception {
+    final String path = "/" + TestHelper.getTestMethodName();
+    final String data = "Hello Helix 2";
+
+    // Wait until the ZkClient has got a new session.
+    Assert.assertTrue(TestHelper
+        .verify(() -> 
_zkClient.getConnection().getZookeeperState().isConnected(), 1000L));
+
+    final long originalSessionId = _zkClient.getSessionId();
+
+    try {
+      // Create node.
+      _zkClient.create(path, data, CreateMode.PERSISTENT);
+    } catch (Exception ex) {
+      Assert.fail("Failed to create ephemeral node.", ex);
+    }
+
+    // Expire the original session.
+    ZkTestHelper.expireSession(_zkClient);
+
+    // Wait until the ZkClient has got a new session.
+    Assert.assertTrue(TestHelper.verify(() -> {

Review comment:
       No need for this check, as it is already verified new session is 
established in `ZkTestHelper.expireSession()`.

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1221,21 +1234,85 @@ private void reconnect() {
     }
   }
 
+
+
+  private void doAsyncSync(final ZooKeeper zk, final String path, final long 
startT,
+      final ZkAsyncCallbacks.SyncCallbackHandler cb) {
+    zk.sync(path, cb,
+        new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, 
startT, 0, true) {
+          @Override
+          protected void doRetry() throws Exception {
+            doAsyncSync(zk, path, System.currentTimeMillis(), cb);
+          }
+        });
+  }
+
+  /*
+   *  Note, issueSync takes a ZooKeeper (client) object and pass it to 
doAsyncSync().
+   *  The reason we do this is that we want to ensure each new session event 
is preceded with exactly
+   *  one sync() to server. The sync() is to make sure the server would not 
see stale data.
+   *
+   *  ZooKeeper client object has an invariant of each object has one session. 
With this invariant
+   *  we can achieve each one sync() to server upon new session establishment. 
The reasoning is:
+   *  issueSync() is called when fireNewSessionEvents() which in under 
eventLock of ZkClient. Thus
+   *  we are guaranteed the ZooKeeper object passed in would have the new 
incoming sessionId. If by
+   *  the time sync() is invoked, the session expires. The sync() would fail 
with a stale session.
+   *  This is exactly what we want. The newer session would ensure another 
fireNewSessionEvents.
+   */
+  private boolean issueSync(ZooKeeper zk) {
+    String sessionId = Long.toHexString(zk.getSessionId());
+    ZkAsyncCallbacks.SyncCallbackHandler callbackHandler =
+        new ZkAsyncCallbacks.SyncCallbackHandler(sessionId);
+
+    final long startT = System.currentTimeMillis();
+    doAsyncSync(zk, SYNC_PATH, startT, callbackHandler);
+
+    callbackHandler.waitForSuccess();
+
+    KeeperException.Code code = 
KeeperException.Code.get(callbackHandler.getRc());
+    if (code == KeeperException.Code.OK) {
+      LOG.info("sycnOnNewSession with sessionID {} async return code: {} and 
proceeds", sessionId,
+          code);
+      return true;
+    }
+
+    // Not retryable error, including session expiration; Log the error and 
return
+    LOG.warn(
+        "sycnOnNewSession with sessionID {} async return code: {} and not 
retryable, stop calling handleNewSession",
+        sessionId, code);
+    return false;
+  }
+
   private void fireNewSessionEvents() {
     // only managing zkclient fire handleNewSession event
     if (!isManagingZkConnection()) {
       return;
     }
     final String sessionId = getHexSessionId();
-    for (final IZkStateListener stateListener : _stateListener) {
-      _eventThread.send(new ZkEventThread.ZkEvent("New session event sent to " 
+ stateListener, sessionId) {
 
+    if (_syncOnNewSession) {
+      final ZooKeeper zk = ((ZkConnection) getConnection()).getZookeeper();
+      _eventThread.send(new ZkEventThread.ZkEvent("Sync call before new 
session event of session " + sessionId,
+          sessionId) {
         @Override
         public void run() throws Exception {
-          stateListener.handleNewSession(sessionId);
+          if (issueSync(zk) == false) {

Review comment:
       From your comment `The sync() would fail with a stale session.`, I 
understand that if the session expires, the following events with the expired 
session calls handleNewSession but they are skipped.
   
   I think there is other case other than sessionExpired that causes 
`issueSync()` to fail. If it fails, warn msg is logged. But stateListener still 
calls handleNewSession. It conflicts the log "sycnOnNewSession with sessionID 
{} async return code: {} and not retryable, stop calling handleNewSession". 
Then there is still a possibility handleNewSession reads stale data. Do I miss 
anything?

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1221,21 +1234,85 @@ private void reconnect() {
     }
   }
 
+
+
+  private void doAsyncSync(final ZooKeeper zk, final String path, final long 
startT,
+      final ZkAsyncCallbacks.SyncCallbackHandler cb) {
+    zk.sync(path, cb,
+        new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, 
startT, 0, true) {
+          @Override
+          protected void doRetry() throws Exception {
+            doAsyncSync(zk, path, System.currentTimeMillis(), cb);
+          }
+        });
+  }
+
+  /*
+   *  Note, issueSync takes a ZooKeeper (client) object and pass it to 
doAsyncSync().
+   *  The reason we do this is that we want to ensure each new session event 
is preceded with exactly
+   *  one sync() to server. The sync() is to make sure the server would not 
see stale data.
+   *
+   *  ZooKeeper client object has an invariant of each object has one session. 
With this invariant
+   *  we can achieve each one sync() to server upon new session establishment. 
The reasoning is:
+   *  issueSync() is called when fireNewSessionEvents() which in under 
eventLock of ZkClient. Thus
+   *  we are guaranteed the ZooKeeper object passed in would have the new 
incoming sessionId. If by
+   *  the time sync() is invoked, the session expires. The sync() would fail 
with a stale session.
+   *  This is exactly what we want. The newer session would ensure another 
fireNewSessionEvents.
+   */
+  private boolean issueSync(ZooKeeper zk) {
+    String sessionId = Long.toHexString(zk.getSessionId());
+    ZkAsyncCallbacks.SyncCallbackHandler callbackHandler =
+        new ZkAsyncCallbacks.SyncCallbackHandler(sessionId);
+
+    final long startT = System.currentTimeMillis();
+    doAsyncSync(zk, SYNC_PATH, startT, callbackHandler);
+
+    callbackHandler.waitForSuccess();
+
+    KeeperException.Code code = 
KeeperException.Code.get(callbackHandler.getRc());
+    if (code == KeeperException.Code.OK) {
+      LOG.info("sycnOnNewSession with sessionID {} async return code: {} and 
proceeds", sessionId,
+          code);
+      return true;
+    }
+
+    // Not retryable error, including session expiration; Log the error and 
return
+    LOG.warn(
+        "sycnOnNewSession with sessionID {} async return code: {} and not 
retryable, stop calling handleNewSession",

Review comment:
       I think this log should be outside of `issueSync` as "stop calling 
handleNewSession" is outside logic. This `issueSync` jus tells sync succeeds or 
not. But it doesn't have to determine handleNewSession or not, right?

##########
File path: 
zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
##########
@@ -553,6 +554,52 @@ public void testCreateEphemeralWithValidSession()
     _zkClient.delete(path);
   }
 
+  /*
+   * This test validates that when ZK_AUTOSYNC_ENABLED_DEFAULT is enabled, 
sync() would be issued
+   * before handleNewSession. ZKclient would not see stale data.
+   */
+  @Test
+  public void testAutoSyncWithNewSessionEstablishment() throws Exception {
+    final String path = "/" + TestHelper.getTestMethodName();
+    final String data = "Hello Helix 2";
+
+    // Wait until the ZkClient has got a new session.
+    Assert.assertTrue(TestHelper

Review comment:
       Nit, I think `assertTrue(_zkClient.waitUntilConnected(1, 
TimeUnit.Second));` would be better than polling

##########
File path: 
zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
##########
@@ -553,6 +554,52 @@ public void testCreateEphemeralWithValidSession()
     _zkClient.delete(path);
   }
 
+  /*
+   * This test validates that when ZK_AUTOSYNC_ENABLED_DEFAULT is enabled, 
sync() would be issued
+   * before handleNewSession. ZKclient would not see stale data.

Review comment:
       Do I understand correctly? This test actually doesn't really test sync, 
because:
   it is only one zk. It doesn't have to propagate data to other zk learners. 
So even without sync, this test will always pass, as the zkclient always 
connects to the same zk so it always sees the latest data.
    
   I actually think it may be difficult to test sync here. We could see how zk 
tests sync, which is more complicated: 
https://github.com/apache/zookeeper/blob/master/zookeeper-server/src/test/java/org/apache/zookeeper/test/SyncCallTest.java
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to