jiajunwang commented on a change in pull request #1119:
URL: https://github.com/apache/helix/pull/1119#discussion_r447314395
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1165,20 +1177,92 @@ private void reconnect() {
}
}
+ private class SyncContext {
+ private CountDownLatch _latch;
+ private AtomicInteger _rc;
+
+ public SyncContext(CountDownLatch latch) {
+ _latch = latch;
+ }
+
+ AtomicInteger getRc() {
+ return _rc;
+ }
+
+ void setRc(AtomicInteger rc) {
+ _rc = rc;
+ }
+
+ CountDownLatch getLatch() {
+ return _latch;
+ }
+ }
+
+ private boolean retrySync(String sessionId) throws ZkInterruptedException {
+ if (!_syncOnNewSession) {
+ return true;
+ }
+ while (true) {
Review comment:
As commented, please check out the other callback implementations. I
think the main logic is in common. So we don't need to re-define everything.
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -103,6 +111,9 @@
// ZkEventThread. Otherwise the retry request might block the normal event
processing.
protected final ZkAsyncRetryThread _asyncCallRetryThread;
+ private final boolean _syncOnNewSession;
+ final String _syncPath = new String("/");
Review comment:
private static?
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -103,6 +111,9 @@
// ZkEventThread. Otherwise the retry request might block the normal event
processing.
protected final ZkAsyncRetryThread _asyncCallRetryThread;
+ private final boolean _syncOnNewSession;
+ final String _syncPath = new String("/");
Review comment:
Directly do _syncPath = "/"; shall help to avoid unnecessary object
creating.
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1165,20 +1177,92 @@ private void reconnect() {
}
}
+ private class SyncContext {
Review comment:
Please add the callback logic to ZkAsyncCallbacks.java if possible.
1. This file has been very large. We shall avoid adding more optional
content here.
2. ZkAsyncCallbacks.java has all the callback defines there. We might be
able to avoid some duplicated code if the class is defined there and extending
the default callback class.
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1165,20 +1177,92 @@ private void reconnect() {
}
}
+ private class SyncContext {
+ private CountDownLatch _latch;
+ private AtomicInteger _rc;
+
+ public SyncContext(CountDownLatch latch) {
+ _latch = latch;
+ }
+
+ AtomicInteger getRc() {
+ return _rc;
+ }
+
+ void setRc(AtomicInteger rc) {
+ _rc = rc;
+ }
+
+ CountDownLatch getLatch() {
+ return _latch;
+ }
+ }
+
+ private boolean retrySync(String sessionId) throws ZkInterruptedException {
+ if (!_syncOnNewSession) {
+ return true;
+ }
+ while (true) {
+ CountDownLatch latch = new CountDownLatch(1);
+ SyncContext ctx = new SyncContext(latch);
+
+ final ZkConnection zkConnection = (ZkConnection) getConnection();
+ zkConnection.getZookeeper().sync(_syncPath, new
AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rt, String s, Object ctx) {
+ LOG.info("sycnOnNewSession with sessionID {} async return code: {}",
sessionId, rt);
+ SyncContext synCtx = ((SyncContext) ctx);
+ synCtx.setRc(new AtomicInteger(rt));
+ synCtx.getLatch().countDown();
+ }
+ }, ctx);
+
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ LOG.info("retrySync latch waiting got interrrupted with sessionId {}
", sessionId);
+ throw new ZkInterruptedException(e);
+ }
+
+ KeeperException.Code code = KeeperException.Code.get(ctx.getRc().get());
+ if (code == OK) {
+ LOG.info("sycnOnNewSession with sessionID {} async return code: {} and
proceeds", sessionId,
+ code);
+ break;
+ }
+ if (code == CONNECTIONLOSS || code == SESSIONMOVED) {
+ LOG.info("sycnOnNewSession with sessionID {} async return code: {} and
retry", sessionId,
+ code);
+ continue;
+ }
+ // Not retryable, including session expiration; but having error. Log
the error and return
+ LOG.error(
+ "sycnOnNewSession with sessionID {} async return code: {} and not
retryable, stop calling handleNewSession",
+ sessionId, code);
+ return false;
+ }
+ return true;
+ }
+
private void fireNewSessionEvents() {
// only managing zkclient fire handleNewSession event
if (!isManagingZkConnection()) {
return;
}
final String sessionId = getHexSessionId();
for (final IZkStateListener stateListener : _stateListener) {
- _eventThread.send(new ZkEventThread.ZkEvent("New session event sent to "
+ stateListener, sessionId) {
+ _eventThread
+ .send(new ZkEventThread.ZkEvent("New session event sent to " +
stateListener, sessionId) {
- @Override
- public void run() throws Exception {
- stateListener.handleNewSession(sessionId);
- }
- });
+ @Override
+ public void run() throws Exception {
+ boolean proceed = retrySync(sessionId);
Review comment:
Do we need to do this for every single state listener?
We shall only do it once, right?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]