kaisun2000 commented on a change in pull request #1119:
URL: https://github.com/apache/helix/pull/1119#discussion_r454736430



##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1221,20 +1237,109 @@ private void reconnect() {
     }
   }
 
+  private class SyncCallbackHandler extends ZkAsyncCallbacks.DefaultCallback 
implements AsyncCallback.VoidCallback {
+    private String _sessionId;
+
+    SyncCallbackHandler(String sessionId) {
+      _sessionId = sessionId;
+    }
+
+    @Override
+    public void processResult(int rc, String path, Object ctx) {
+      LOG.info("sycnOnNewSession with sessionID {} async return code: {}", 
_sessionId, rc);
+      callback(rc, path, ctx);
+    }
+
+    @Override
+    public void handle() {
+      // Make compiler happy, not used.
+    }
+
+    @Override
+    protected boolean needRetry(int rc) {
+      try {
+        switch (KeeperException.Code.get(rc)) {
+          /** Connection to the server has been lost */
+          case CONNECTIONLOSS:
+            return true;
+          default:
+            return false;
+        }
+      } catch (ClassCastException | NullPointerException ex) {
+        LOG.error("Session {} failed to handle unknown return code {}. Skip 
retrying. ex {}",
+            _sessionId, rc, ex);
+        return false;
+      }
+    }
+  }
+
+  private void doAsyncSync(final ZooKeeper zk, final String path, final long 
startT,
+      final SyncCallbackHandler cb) {
+    zk.sync(path, cb,
+        new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, 
startT, 0, false) {
+          @Override
+          protected void doRetry() throws Exception {
+            doAsyncSync(zk, path, startT, cb);
+          }
+        });
+  }
+
+  private boolean retrySync(String sessionId) throws ZkInterruptedException {
+    if (!_syncOnNewSession) {
+      return true;
+    }
+
+    SyncCallbackHandler callbackHandler = new SyncCallbackHandler(sessionId);
+
+    final ZkConnection zkConnection = (ZkConnection) getConnection();
+    final ZooKeeper zk = zkConnection.getZookeeper();
+    final long startT = System.currentTimeMillis();
+    doAsyncSync(zk, _syncPath, startT, callbackHandler);
+
+    callbackHandler.waitForSuccess();
+
+    KeeperException.Code code = 
KeeperException.Code.get(callbackHandler.getRc());
+    if (code == OK) {
+      LOG.info("sycnOnNewSession with sessionID {} async return code: {} and 
proceeds", sessionId,
+          code);
+      return true;
+    }
+
+    // Not retryable error, including session expiration; Log the error and 
return
+    LOG.warn(
+        "sycnOnNewSession with sessionID {} async return code: {} and not 
retryable, stop calling handleNewSession",
+        sessionId, code);
+    return false;
+  }
+
   private void fireNewSessionEvents() {
     // only managing zkclient fire handleNewSession event
     if (!isManagingZkConnection()) {
       return;
     }
     final String sessionId = getHexSessionId();
+
+    _eventThread.send(
+        new ZkEventThread.ZkEvent("Sync call before new session event of 
session " + sessionId,
+            sessionId) {
+          @Override
+          public void run() throws Exception {
+            boolean syncStatus = retrySync(sessionId);

Review comment:
       fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to