jiajunwang commented on a change in pull request #1035:
URL: https://github.com/apache/helix/pull/1035#discussion_r434994750



##########
File path: 
helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
##########
@@ -317,6 +324,150 @@ public boolean verify() throws Exception {
 
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }
+  @Test
+  public void testDanglingCallbackHanlderFix() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 3;
+    final String zkAddr = ZK_ADDR;
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB", 
1, // resource
+        32, // partitions
+        n, // nodes
+        2, // replicas
+        "MasterSlave", true);
+
+    final ClusterControllerManager controller =
+        new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+    controller.syncStart();
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(zkAddr, clusterName, 
instanceName);
+      participants[i].syncStart();
+    }
+
+    Boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new 
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+    Assert.assertTrue(result);
+
+    //HelixManager rpManager  = HelixManagerFactory
+    //    .getZKHelixManager(clusterName, "", InstanceType.SPECTATOR, ZK_ADDR);
+    //rpManager.connect();
+    ClusterSpectatorManager rpManager  = new ClusterSpectatorManager(ZK_ADDR, 
clusterName, "router");
+    rpManager.syncStart();
+    RoutingTableProvider rp = new RoutingTableProvider(rpManager, 
PropertyType.CURRENTSTATES);
+
+    Thread.sleep(5000);

Review comment:
       We agreed on not adding more magic numbers as the sleep time before.
   Please use wait or verifier which loop and wait until the expected condition 
matches.

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -555,7 +563,7 @@ private void subscribeDataChange(String path, 
NotificationContext.Type callbackT
         logger.debug(_manager.getInstanceName() + " subscribe data-change. 
path: " + path
             + ", listener: " + _listener);
       }
-      _zkClient.subscribeDataChanges(path, this);
+      _zkClient.subscribeDataChanges(path, this, callbackType != Type.INIT);

Review comment:
       Same here, we are not checking the result.

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
##########
@@ -69,12 +69,49 @@
   int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
 
   // listener subscription
+  /**
+   * Subscribe the path and the listener will handle child events of the path.
+   * Add exists watch to path if the path does not exist in ZooKeeper server.
+   * WARNING: if the path is created after deletion, users need to 
re-subscribe the path
+   * @param path The zookeeper path
+   * @param listener Instance of {@link IZkDataListener}
+   * The method return null if the path does not exists. Otherwise, return a 
list of children
+   * under the path. The list can be empty if there is no children.
+   */
   List<String> subscribeChildChanges(String path, IZkChildListener listener);
 
+  /**
+   * Subscribe the path and the listener will handle child events of the path
+   * WARNING: if the path is created after deletion, users need to 
re-subscribe the path
+   * @param path The zookeeper path
+   * @param listener Instance of {@link IZkDataListener}
+   * @param skipWatchingNodeNotExist True means not installing any watch if 
path does not exist.
+   * The method return ChildrentSubsribeResult. If the path does not exists, 
the isInstalled field
+   * is false. Otherwise, it is true and list of children are returned.
+   */
+  ChildrenSubscribeResult subscribeChildChanges(String path, IZkChildListener 
listener, boolean skipWatchingNodeNotExist);
+
   void unsubscribeChildChanges(String path, IZkChildListener listener);
 
+  /**
+   * Subscribe the path and the listener will handle data events of the path
+   * Add the exists watch to Zookeeper server even if the path does not exists 
in zookeeper server
+   * WARNING: if the path is created after deletion, users need to 
re-subscribe the path
+   * @param path The zookeeper path
+   * @param listener Instance of {@link IZkDataListener}
+   */
   void subscribeDataChanges(String path, IZkDataListener listener);

Review comment:
       Deprecate?

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
##########
@@ -69,12 +69,49 @@
   int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
 
   // listener subscription
+  /**
+   * Subscribe the path and the listener will handle child events of the path.
+   * Add exists watch to path if the path does not exist in ZooKeeper server.
+   * WARNING: if the path is created after deletion, users need to 
re-subscribe the path
+   * @param path The zookeeper path
+   * @param listener Instance of {@link IZkDataListener}
+   * The method return null if the path does not exists. Otherwise, return a 
list of children
+   * under the path. The list can be empty if there is no children.
+   */
   List<String> subscribeChildChanges(String path, IZkChildListener listener);

Review comment:
       Deprecate?

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1865,17 +1934,24 @@ public Object call() throws Exception {
    *         exist.
    */
   public List<String> watchForChilds(final String path) {
+    return watchForChilds(path, false);
+  }
+
+  private List<String> watchForChilds(final String path, boolean 
skipWatchingNodeNotExist) {
     if (_zookeeperEventThread != null && Thread.currentThread() == 
_zookeeperEventThread) {
       throw new IllegalArgumentException("Must not be done in the zookeeper 
event thread.");
     }
     return retryUntilConnected(new Callable<List<String>>() {
       @Override
       public List<String> call() throws Exception {
-        exists(path, true);
+        if (!skipWatchingNodeNotExist) {

Review comment:
       > 1/ skipWatchingNodeNotExist true, if the parent path is not there. We 
will not install watch.
   
   What you are doing is, if skipWatchingNodeNotExist is true, then not install 
the watch no matter the parent path exists or not. Please double check.

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -539,7 +539,15 @@ private void subscribeChildChange(String path, 
NotificationContext.Type callback
         logger.debug(_manager.getInstanceName() + " subscribes child-change. 
path: " + path
             + ", listener: " + _listener);
       }
-      _zkClient.subscribeChildChanges(path, this);
+      // In the lifecycle of CallbackHandler, INIT is the first stage of 
registration of watch.
+      // For some usage case such as current state, the path can be created 
later. Thus we would
+      // install watch anyway event the path is not yet created.
+      // Later, CALLBACK type, the CallbackHandler already registered the 
watch and knows the
+      // path was created. Here, to avoid leaking path in ZooKeeper server, we 
would not let
+      // CallbackHandler to install exists watch, namely watch for path not 
existing.
+      // Note when path is removed, the CallbackHanler would remove itself 
from ZkHelixManager too
+      // to avoid leaking a CallbackHandler.
+      _zkClient.subscribeChildChanges(path, this, callbackType != Type.INIT);

Review comment:
       Why not check the result here? If subscribe fails, shall we just ignore 
it?

##########
File path: 
helix-core/src/test/java/org/apache/helix/integration/manager/ClusterManager.java
##########
@@ -0,0 +1,102 @@
+package org.apache.helix.integration.manager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ClusterManager extends ZKHelixManager implements Runnable, 
ZkTestManager {
+  private static Logger LOG = 
LoggerFactory.getLogger(ClusterControllerManager.class);
+
+  protected CountDownLatch _startCountDown = new CountDownLatch(1);
+  protected CountDownLatch _stopCountDown = new CountDownLatch(1);
+  protected CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
+
+  protected boolean _started = false;
+
+  public ClusterManager(String zkAddr, String clusterName, InstanceType type) {
+    this(zkAddr, clusterName, "role", type);
+  }
+
+  public ClusterManager(String zkAddr, String clusterName, String roleName, 
InstanceType type) {

Review comment:
       roleName -> instanceName?
   
   And this constructor can be protected, only called in the children.

##########
File path: 
helix-core/src/test/java/org/apache/helix/integration/manager/ClusterManager.java
##########
@@ -0,0 +1,102 @@
+package org.apache.helix.integration.manager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ClusterManager extends ZKHelixManager implements Runnable, 
ZkTestManager {
+  private static Logger LOG = 
LoggerFactory.getLogger(ClusterControllerManager.class);
+
+  protected CountDownLatch _startCountDown = new CountDownLatch(1);
+  protected CountDownLatch _stopCountDown = new CountDownLatch(1);
+  protected CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
+
+  protected boolean _started = false;
+
+  public ClusterManager(String zkAddr, String clusterName, InstanceType type) {

Review comment:
       This is not necessary, I think.

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/ChildrenSubscribeResult.java
##########
@@ -0,0 +1,45 @@
+package org.apache.helix.zookeeper.api.client;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+
+
+public class ChildrenSubscribeResult {

Review comment:
       Add some Java doc to describe this new class please.

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
##########
@@ -69,12 +69,49 @@
   int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
 
   // listener subscription
+  /**
+   * Subscribe the path and the listener will handle child events of the path.
+   * Add exists watch to path if the path does not exist in ZooKeeper server.
+   * WARNING: if the path is created after deletion, users need to 
re-subscribe the path
+   * @param path The zookeeper path
+   * @param listener Instance of {@link IZkDataListener}
+   * The method return null if the path does not exists. Otherwise, return a 
list of children
+   * under the path. The list can be empty if there is no children.
+   */
   List<String> subscribeChildChanges(String path, IZkChildListener listener);
 
+  /**
+   * Subscribe the path and the listener will handle child events of the path
+   * WARNING: if the path is created after deletion, users need to 
re-subscribe the path
+   * @param path The zookeeper path
+   * @param listener Instance of {@link IZkDataListener}
+   * @param skipWatchingNodeNotExist True means not installing any watch if 
path does not exist.

Review comment:
       skipWatchingNonExistNode?

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/ChildrenSubscribeResult.java
##########
@@ -0,0 +1,45 @@
+package org.apache.helix.zookeeper.api.client;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+
+
+public class ChildrenSubscribeResult {
+  private List<String> _children;

Review comment:
       final for both fields

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
##########
@@ -69,12 +69,49 @@
   int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
 
   // listener subscription
+  /**
+   * Subscribe the path and the listener will handle child events of the path.
+   * Add exists watch to path if the path does not exist in ZooKeeper server.
+   * WARNING: if the path is created after deletion, users need to 
re-subscribe the path
+   * @param path The zookeeper path
+   * @param listener Instance of {@link IZkDataListener}
+   * The method return null if the path does not exists. Otherwise, return a 
list of children
+   * under the path. The list can be empty if there is no children.
+   */
   List<String> subscribeChildChanges(String path, IZkChildListener listener);
 
+  /**
+   * Subscribe the path and the listener will handle child events of the path
+   * WARNING: if the path is created after deletion, users need to 
re-subscribe the path
+   * @param path The zookeeper path
+   * @param listener Instance of {@link IZkDataListener}
+   * @param skipWatchingNodeNotExist True means not installing any watch if 
path does not exist.
+   * The method return ChildrentSubsribeResult. If the path does not exists, 
the isInstalled field

Review comment:
       @return ....

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1270,6 +1330,9 @@ public void deleteRecursively(String path) throws 
ZkClientException {
   private void processDataOrChildChange(WatchedEvent event, long 
notificationTime) {
     final String path = event.getPath();
     final boolean pathExists = event.getType() != EventType.NodeDeleted;
+    if (EventType.NodeDeleted == event.getType()) {
+      LOG.debug("event delelete: {}", event.getPath());

Review comment:
       Just curious, why we need this debug?
   
   And nit, if (LOG.isDebugEnabled() && ventType.NodeDeleted == event.getType())
   Just want to save some unnecessary computation.

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1849,13 +1924,23 @@ private void checkDataSizeLimit(byte[] data) {
   }
 
   public void watchForData(final String path) {
-    retryUntilConnected(new Callable<Object>() {
-      @Override
-      public Object call() throws Exception {
-        getConnection().exists(path, true);
-        return null;
+    watchForData(path, false);
+  }
+
+  private boolean watchForData(final String path, boolean 
skipWatchingNodeNotExist) {
+    try {
+      if (skipWatchingNodeNotExist) {
+        Stat stat = new Stat();

Review comment:
       This one is not used anyway, just new a Stat in the lamda.

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1304,7 +1356,11 @@ private void fireDataChangedEvents(final String path, 
Set<IZkDataListenerEntry>
           public void run() throws Exception {
             if (!pathStatRecord.pathChecked()) {
               // getStat will re-install watcher only when the path exists
-              pathStatRecord.recordPathStat(getStat(path, pathExists), 
notificationTime);
+              if (!pathExists) {

Review comment:
       I the current way, code readers may still not following. Please add some 
comments here for the logic.

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1849,13 +1924,23 @@ private void checkDataSizeLimit(byte[] data) {
   }
 
   public void watchForData(final String path) {
-    retryUntilConnected(new Callable<Object>() {
-      @Override
-      public Object call() throws Exception {
-        getConnection().exists(path, true);
-        return null;
+    watchForData(path, false);
+  }
+
+  private boolean watchForData(final String path, boolean 
skipWatchingNodeNotExist) {
+    try {
+      if (skipWatchingNodeNotExist) {

Review comment:
       Will it be cleaner if we put this condition into the retryUntilConnected 
lamda?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to