kaisun2000 commented on a change in pull request #1119:
URL: https://github.com/apache/helix/pull/1119#discussion_r459652434
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1221,21 +1234,85 @@ private void reconnect() {
}
}
+
+
+ private void doAsyncSync(final ZooKeeper zk, final String path, final long
startT,
+ final ZkAsyncCallbacks.SyncCallbackHandler cb) {
+ zk.sync(path, cb,
+ new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor,
startT, 0, true) {
+ @Override
+ protected void doRetry() throws Exception {
+ doAsyncSync(zk, path, System.currentTimeMillis(), cb);
+ }
+ });
+ }
+
+ /*
+ * Note, issueSync takes a ZooKeeper (client) object and pass it to
doAsyncSync().
+ * The reason we do this is that we want to ensure each new session event
is preceded with exactly
+ * one sync() to server. The sync() is to make sure the server would not
see stale data.
+ *
+ * ZooKeeper client object has an invariant of each object has one session.
With this invariant
+ * we can achieve each one sync() to server upon new session establishment.
The reasoning is:
+ * issueSync() is called when fireNewSessionEvents() which in under
eventLock of ZkClient. Thus
+ * we are guaranteed the ZooKeeper object passed in would have the new
incoming sessionId. If by
+ * the time sync() is invoked, the session expires. The sync() would fail
with a stale session.
+ * This is exactly what we want. The newer session would ensure another
fireNewSessionEvents.
+ */
+ private boolean issueSync(ZooKeeper zk) {
+ String sessionId = Long.toHexString(zk.getSessionId());
+ ZkAsyncCallbacks.SyncCallbackHandler callbackHandler =
+ new ZkAsyncCallbacks.SyncCallbackHandler(sessionId);
+
+ final long startT = System.currentTimeMillis();
+ doAsyncSync(zk, SYNC_PATH, startT, callbackHandler);
+
+ callbackHandler.waitForSuccess();
+
+ KeeperException.Code code =
KeeperException.Code.get(callbackHandler.getRc());
+ if (code == KeeperException.Code.OK) {
+ LOG.info("sycnOnNewSession with sessionID {} async return code: {} and
proceeds", sessionId,
+ code);
+ return true;
+ }
+
+ // Not retryable error, including session expiration; Log the error and
return
+ LOG.warn(
+ "sycnOnNewSession with sessionID {} async return code: {} and not
retryable, stop calling handleNewSession",
+ sessionId, code);
+ return false;
+ }
+
private void fireNewSessionEvents() {
// only managing zkclient fire handleNewSession event
if (!isManagingZkConnection()) {
return;
}
final String sessionId = getHexSessionId();
- for (final IZkStateListener stateListener : _stateListener) {
- _eventThread.send(new ZkEventThread.ZkEvent("New session event sent to "
+ stateListener, sessionId) {
+ if (_syncOnNewSession) {
+ final ZooKeeper zk = ((ZkConnection) getConnection()).getZookeeper();
+ _eventThread.send(new ZkEventThread.ZkEvent("Sync call before new
session event of session " + sessionId,
+ sessionId) {
@Override
public void run() throws Exception {
- stateListener.handleNewSession(sessionId);
+ if (issueSync(zk) == false) {
Review comment:
As synced offline, we can consider the current approach is a best effort
one. The main trade-off is that waiting for succeeds of async sync() actually
also has performance hit. (That is the reason why the sync() only has async
version.) The designer of this API seems to envision that one can issue an asyc
sync() without checking results and issue all the other read knowing they would
see only updates after the sycn().
Feel free to add another issue/request, if this turns out to be problematic
in the field, let us revisit it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]