jiajunwang commented on a change in pull request #1119:
URL: https://github.com/apache/helix/pull/1119#discussion_r454696595
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/ZkSystemPropertyKeys.java
##########
@@ -50,4 +50,14 @@
*/
public static final String ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES =
"zk.serializer.znrecord.write.size.limit.bytes";
+
+ /**
+ * This property determines the behavior of ZkClient issuing an sync() to
server upon new session
+ * established.
+ *
+ * <p>
+ * The default value is "true" (issuing sync)
+ */
+ public static final String ZK_SYNC_UPON_NEWSESSION =
Review comment:
How about just call it "zk.autosync.enabled"
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -187,6 +199,10 @@ protected ZkClient(IZkConnection zkConnection, int
connectionTimeout, long opera
if (zkConnection == null) {
throw new NullPointerException("Zookeeper connection is null!");
}
+
+ String syncUpOnNewSession =
System.getProperty(ZkSystemPropertyKeys.ZK_SYNC_UPON_NEWSESSION, "true");
Review comment:
nit, shall we define the default value of ZK_SYNC_UPON_NEWSESSION in the
ZkSystemPropertyKeys instead of hardcode it here?
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1221,20 +1237,109 @@ private void reconnect() {
}
}
+ private class SyncCallbackHandler extends ZkAsyncCallbacks.DefaultCallback
implements AsyncCallback.VoidCallback {
Review comment:
Can we move it to the ZkAsyncCallbacks file so all the callbacks can be
found there?
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1221,20 +1237,109 @@ private void reconnect() {
}
}
+ private class SyncCallbackHandler extends ZkAsyncCallbacks.DefaultCallback
implements AsyncCallback.VoidCallback {
+ private String _sessionId;
+
+ SyncCallbackHandler(String sessionId) {
+ _sessionId = sessionId;
+ }
+
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ LOG.info("sycnOnNewSession with sessionID {} async return code: {}",
_sessionId, rc);
+ callback(rc, path, ctx);
+ }
+
+ @Override
+ public void handle() {
+ // Make compiler happy, not used.
+ }
+
+ @Override
+ protected boolean needRetry(int rc) {
+ try {
+ switch (KeeperException.Code.get(rc)) {
+ /** Connection to the server has been lost */
+ case CONNECTIONLOSS:
+ return true;
+ default:
+ return false;
+ }
+ } catch (ClassCastException | NullPointerException ex) {
+ LOG.error("Session {} failed to handle unknown return code {}. Skip
retrying. ex {}",
+ _sessionId, rc, ex);
+ return false;
+ }
+ }
+ }
+
+ private void doAsyncSync(final ZooKeeper zk, final String path, final long
startT,
+ final SyncCallbackHandler cb) {
+ zk.sync(path, cb,
+ new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor,
startT, 0, false) {
+ @Override
+ protected void doRetry() throws Exception {
+ doAsyncSync(zk, path, startT, cb);
+ }
+ });
+ }
+
+ private boolean retrySync(String sessionId) throws ZkInterruptedException {
+ if (!_syncOnNewSession) {
Review comment:
Can we move this check to the caller of retrySync()? So the method will
do exactly what it is named.
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1221,20 +1237,109 @@ private void reconnect() {
}
}
+ private class SyncCallbackHandler extends ZkAsyncCallbacks.DefaultCallback
implements AsyncCallback.VoidCallback {
+ private String _sessionId;
+
+ SyncCallbackHandler(String sessionId) {
+ _sessionId = sessionId;
+ }
+
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ LOG.info("sycnOnNewSession with sessionID {} async return code: {}",
_sessionId, rc);
+ callback(rc, path, ctx);
+ }
+
+ @Override
+ public void handle() {
+ // Make compiler happy, not used.
+ }
+
+ @Override
+ protected boolean needRetry(int rc) {
+ try {
+ switch (KeeperException.Code.get(rc)) {
+ /** Connection to the server has been lost */
+ case CONNECTIONLOSS:
+ return true;
+ default:
+ return false;
+ }
+ } catch (ClassCastException | NullPointerException ex) {
+ LOG.error("Session {} failed to handle unknown return code {}. Skip
retrying. ex {}",
+ _sessionId, rc, ex);
+ return false;
+ }
+ }
+ }
+
+ private void doAsyncSync(final ZooKeeper zk, final String path, final long
startT,
+ final SyncCallbackHandler cb) {
+ zk.sync(path, cb,
+ new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor,
startT, 0, false) {
+ @Override
+ protected void doRetry() throws Exception {
+ doAsyncSync(zk, path, startT, cb);
+ }
+ });
+ }
+
+ private boolean retrySync(String sessionId) throws ZkInterruptedException {
+ if (!_syncOnNewSession) {
+ return true;
+ }
+
+ SyncCallbackHandler callbackHandler = new SyncCallbackHandler(sessionId);
+
+ final ZkConnection zkConnection = (ZkConnection) getConnection();
Review comment:
I think you can just refer the ZkClient object itself in the doAsyncSync
call to simplify these codes. Or is there any concern of reading the Zookeeper
inside doAsyncSync where it is really used?
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -105,6 +114,9 @@
// ZkEventThread. Otherwise the retry request might block the normal event
processing.
protected final ZkAsyncRetryThread _asyncCallRetryThread;
+ private final boolean _syncOnNewSession;
+ final String _syncPath = "/";
Review comment:
private static final String SYNC_PATH = "/" ?
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1221,20 +1237,109 @@ private void reconnect() {
}
}
+ private class SyncCallbackHandler extends ZkAsyncCallbacks.DefaultCallback
implements AsyncCallback.VoidCallback {
+ private String _sessionId;
+
+ SyncCallbackHandler(String sessionId) {
+ _sessionId = sessionId;
+ }
+
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ LOG.info("sycnOnNewSession with sessionID {} async return code: {}",
_sessionId, rc);
+ callback(rc, path, ctx);
+ }
+
+ @Override
+ public void handle() {
+ // Make compiler happy, not used.
+ }
+
+ @Override
+ protected boolean needRetry(int rc) {
+ try {
+ switch (KeeperException.Code.get(rc)) {
+ /** Connection to the server has been lost */
+ case CONNECTIONLOSS:
+ return true;
+ default:
+ return false;
+ }
+ } catch (ClassCastException | NullPointerException ex) {
+ LOG.error("Session {} failed to handle unknown return code {}. Skip
retrying. ex {}",
+ _sessionId, rc, ex);
+ return false;
+ }
+ }
+ }
+
+ private void doAsyncSync(final ZooKeeper zk, final String path, final long
startT,
+ final SyncCallbackHandler cb) {
+ zk.sync(path, cb,
+ new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor,
startT, 0, false) {
Review comment:
Is sync counted as write operations? Just to confirm.
I know it is kind of happening in order like a write, but does it actually
update anything on the server-side? Note if we record the type differently,
then the server-side metric and client-side metric will have different numbers.
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1221,20 +1237,109 @@ private void reconnect() {
}
}
+ private class SyncCallbackHandler extends ZkAsyncCallbacks.DefaultCallback
implements AsyncCallback.VoidCallback {
+ private String _sessionId;
+
+ SyncCallbackHandler(String sessionId) {
+ _sessionId = sessionId;
+ }
+
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ LOG.info("sycnOnNewSession with sessionID {} async return code: {}",
_sessionId, rc);
+ callback(rc, path, ctx);
+ }
+
+ @Override
+ public void handle() {
+ // Make compiler happy, not used.
+ }
+
+ @Override
+ protected boolean needRetry(int rc) {
+ try {
+ switch (KeeperException.Code.get(rc)) {
+ /** Connection to the server has been lost */
+ case CONNECTIONLOSS:
+ return true;
+ default:
+ return false;
+ }
+ } catch (ClassCastException | NullPointerException ex) {
+ LOG.error("Session {} failed to handle unknown return code {}. Skip
retrying. ex {}",
+ _sessionId, rc, ex);
+ return false;
+ }
+ }
+ }
+
+ private void doAsyncSync(final ZooKeeper zk, final String path, final long
startT,
+ final SyncCallbackHandler cb) {
+ zk.sync(path, cb,
+ new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor,
startT, 0, false) {
+ @Override
+ protected void doRetry() throws Exception {
+ doAsyncSync(zk, path, startT, cb);
+ }
+ });
+ }
+
+ private boolean retrySync(String sessionId) throws ZkInterruptedException {
+ if (!_syncOnNewSession) {
+ return true;
+ }
+
+ SyncCallbackHandler callbackHandler = new SyncCallbackHandler(sessionId);
+
+ final ZkConnection zkConnection = (ZkConnection) getConnection();
Review comment:
I guess you concern if the Zookeeper object is changed if new session
established. But I think we can still keep retrying the sync call, although it
is in theory not possible because we stop retrying on session expiring.
Or is there anything else?
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1221,20 +1237,109 @@ private void reconnect() {
}
}
+ private class SyncCallbackHandler extends ZkAsyncCallbacks.DefaultCallback
implements AsyncCallback.VoidCallback {
+ private String _sessionId;
+
+ SyncCallbackHandler(String sessionId) {
+ _sessionId = sessionId;
+ }
+
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ LOG.info("sycnOnNewSession with sessionID {} async return code: {}",
_sessionId, rc);
+ callback(rc, path, ctx);
+ }
+
+ @Override
+ public void handle() {
+ // Make compiler happy, not used.
+ }
+
+ @Override
+ protected boolean needRetry(int rc) {
+ try {
+ switch (KeeperException.Code.get(rc)) {
+ /** Connection to the server has been lost */
+ case CONNECTIONLOSS:
+ return true;
+ default:
+ return false;
+ }
+ } catch (ClassCastException | NullPointerException ex) {
+ LOG.error("Session {} failed to handle unknown return code {}. Skip
retrying. ex {}",
+ _sessionId, rc, ex);
+ return false;
+ }
+ }
+ }
+
+ private void doAsyncSync(final ZooKeeper zk, final String path, final long
startT,
+ final SyncCallbackHandler cb) {
+ zk.sync(path, cb,
+ new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor,
startT, 0, false) {
+ @Override
+ protected void doRetry() throws Exception {
+ doAsyncSync(zk, path, startT, cb);
+ }
+ });
+ }
+
+ private boolean retrySync(String sessionId) throws ZkInterruptedException {
+ if (!_syncOnNewSession) {
+ return true;
+ }
+
+ SyncCallbackHandler callbackHandler = new SyncCallbackHandler(sessionId);
+
+ final ZkConnection zkConnection = (ZkConnection) getConnection();
+ final ZooKeeper zk = zkConnection.getZookeeper();
+ final long startT = System.currentTimeMillis();
+ doAsyncSync(zk, _syncPath, startT, callbackHandler);
+
+ callbackHandler.waitForSuccess();
+
+ KeeperException.Code code =
KeeperException.Code.get(callbackHandler.getRc());
+ if (code == OK) {
+ LOG.info("sycnOnNewSession with sessionID {} async return code: {} and
proceeds", sessionId,
+ code);
+ return true;
+ }
+
+ // Not retryable error, including session expiration; Log the error and
return
+ LOG.warn(
+ "sycnOnNewSession with sessionID {} async return code: {} and not
retryable, stop calling handleNewSession",
+ sessionId, code);
+ return false;
+ }
+
private void fireNewSessionEvents() {
// only managing zkclient fire handleNewSession event
if (!isManagingZkConnection()) {
return;
}
final String sessionId = getHexSessionId();
+
+ _eventThread.send(
+ new ZkEventThread.ZkEvent("Sync call before new session event of
session " + sessionId,
+ sessionId) {
+ @Override
+ public void run() throws Exception {
+ boolean syncStatus = retrySync(sessionId);
Review comment:
nit, to be compact,
if (retrySync(sessionId) == false) {
...
}
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1221,20 +1237,109 @@ private void reconnect() {
}
}
+ private class SyncCallbackHandler extends ZkAsyncCallbacks.DefaultCallback
implements AsyncCallback.VoidCallback {
+ private String _sessionId;
+
+ SyncCallbackHandler(String sessionId) {
+ _sessionId = sessionId;
+ }
+
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ LOG.info("sycnOnNewSession with sessionID {} async return code: {}",
_sessionId, rc);
+ callback(rc, path, ctx);
+ }
+
+ @Override
+ public void handle() {
+ // Make compiler happy, not used.
+ }
+
+ @Override
+ protected boolean needRetry(int rc) {
+ try {
+ switch (KeeperException.Code.get(rc)) {
+ /** Connection to the server has been lost */
+ case CONNECTIONLOSS:
+ return true;
+ default:
+ return false;
+ }
+ } catch (ClassCastException | NullPointerException ex) {
+ LOG.error("Session {} failed to handle unknown return code {}. Skip
retrying. ex {}",
+ _sessionId, rc, ex);
+ return false;
+ }
+ }
+ }
+
+ private void doAsyncSync(final ZooKeeper zk, final String path, final long
startT,
+ final SyncCallbackHandler cb) {
+ zk.sync(path, cb,
+ new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor,
startT, 0, false) {
+ @Override
+ protected void doRetry() throws Exception {
+ doAsyncSync(zk, path, startT, cb);
Review comment:
Pass the current system time or the metric number will be messed up.
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1221,20 +1237,109 @@ private void reconnect() {
}
}
+ private class SyncCallbackHandler extends ZkAsyncCallbacks.DefaultCallback
implements AsyncCallback.VoidCallback {
+ private String _sessionId;
+
+ SyncCallbackHandler(String sessionId) {
+ _sessionId = sessionId;
+ }
+
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ LOG.info("sycnOnNewSession with sessionID {} async return code: {}",
_sessionId, rc);
+ callback(rc, path, ctx);
+ }
+
+ @Override
+ public void handle() {
+ // Make compiler happy, not used.
+ }
+
+ @Override
+ protected boolean needRetry(int rc) {
+ try {
+ switch (KeeperException.Code.get(rc)) {
+ /** Connection to the server has been lost */
+ case CONNECTIONLOSS:
+ return true;
+ default:
+ return false;
+ }
+ } catch (ClassCastException | NullPointerException ex) {
+ LOG.error("Session {} failed to handle unknown return code {}. Skip
retrying. ex {}",
+ _sessionId, rc, ex);
+ return false;
+ }
+ }
+ }
+
+ private void doAsyncSync(final ZooKeeper zk, final String path, final long
startT,
+ final SyncCallbackHandler cb) {
+ zk.sync(path, cb,
+ new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor,
startT, 0, false) {
+ @Override
+ protected void doRetry() throws Exception {
+ doAsyncSync(zk, path, startT, cb);
+ }
+ });
+ }
+
+ private boolean retrySync(String sessionId) throws ZkInterruptedException {
+ if (!_syncOnNewSession) {
+ return true;
+ }
+
+ SyncCallbackHandler callbackHandler = new SyncCallbackHandler(sessionId);
+
+ final ZkConnection zkConnection = (ZkConnection) getConnection();
+ final ZooKeeper zk = zkConnection.getZookeeper();
+ final long startT = System.currentTimeMillis();
+ doAsyncSync(zk, _syncPath, startT, callbackHandler);
+
+ callbackHandler.waitForSuccess();
+
+ KeeperException.Code code =
KeeperException.Code.get(callbackHandler.getRc());
+ if (code == OK) {
+ LOG.info("sycnOnNewSession with sessionID {} async return code: {} and
proceeds", sessionId,
+ code);
+ return true;
+ }
+
+ // Not retryable error, including session expiration; Log the error and
return
+ LOG.warn(
+ "sycnOnNewSession with sessionID {} async return code: {} and not
retryable, stop calling handleNewSession",
+ sessionId, code);
+ return false;
+ }
+
private void fireNewSessionEvents() {
// only managing zkclient fire handleNewSession event
if (!isManagingZkConnection()) {
return;
}
final String sessionId = getHexSessionId();
+
+ _eventThread.send(
Review comment:
As mentioned above, the check of _syncOnNewSession can be put here to
avoid the unnecessary ZK event.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]