pkuwm commented on a change in pull request #1035:
URL: https://github.com/apache/helix/pull/1035#discussion_r435415302



##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1054,15 +1082,46 @@ public Stat getStat(final String path) {
   }
 
   private Stat getStat(final String path, final boolean watch) {
+    return getStat(path, watch, false);
+  }
+
+  /*
+   * Install watch if there is such node and return the stat
+   *
+   * If useGetData false, use exist(). @param watch would determine if adding 
watch
+   * to ZooKeeper server or not.
+   * Note, if @param path does not exist in ZooKeeper server, watch would 
still be installed
+   * if @param watch is true.
+   *
+   * If useGetData true, use getData() to add watch. ignore @param watch in 
this case.
+   * Note, if @param path does not exist in ZooKeeper server, no watch would 
be added.
+   */
+  private Stat getStat(final String path, final boolean watch, final boolean 
useGetData) {
     long startT = System.currentTimeMillis();
+    final Stat stat;
     try {
-      Stat stat = retryUntilConnected(
-          () -> ((ZkConnection) getConnection()).getZookeeper().exists(path, 
watch));
+      if (!useGetData) {
+        stat = retryUntilConnected(
+            () -> ((ZkConnection) getConnection()).getZookeeper().exists(path, 
watch));
+      } else {
+        stat = new Stat();
+        try {
+          LOG.debug("getstat() invoked with useGetData() with path: {} ", 
path);
+          retryUntilConnected(() -> ((ZkConnection) 
getConnection()).getZookeeper().getData(path, true, stat));
+        } catch (ZkNoNodeException e) {
+          LOG.debug("getstat() invoked path: {}  null  useGetData: {}", path, 
useGetData);

Review comment:
       What is this `null` representing? :)

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -555,7 +569,11 @@ private void subscribeDataChange(String path, 
NotificationContext.Type callbackT
         logger.debug(_manager.getInstanceName() + " subscribe data-change. 
path: " + path
             + ", listener: " + _listener);
       }
-      _zkClient.subscribeDataChanges(path, this);
+      boolean rt = _zkClient.subscribeDataChanges(path, this, callbackType != 
Type.INIT);

Review comment:
       Could have been better to use java style naming? A clearer variable name 
would be appreciated.

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/ChildrenSubscribeResult.java
##########
@@ -0,0 +1,54 @@
+package org.apache.helix.zookeeper.api.client;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+
+
+/** Represents return type of {@link 
org.apache.helix.zookeeper.api.client.RealmAwareZkClient#subscribeChildChanges(String,
 IZkChildListener, boolean)}
+ *  The returned value would signal if watch installation to ZooKeeper server 
succeeded
+ *  or not using field _isInstalled. The _children field would contains the 
list of child names
+ *  of the watched path. It would be null if the parent path does not exist at 
the time of watch
+ *  installation.
+ */
+

Review comment:
       Not trying to be buggy, but usually no empty space between comments and 
class signature.

##########
File path: 
helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
##########
@@ -318,6 +325,151 @@ public boolean verify() throws Exception {
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }
 
+  @Test
+  public void testDanglingCallbackHanlderFix() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 3;
+
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, "localhost", 
"TestDB", 1, // resource
+        32, // partitions
+        n, // nodes
+        2, // replicas
+        "MasterSlave", true);
+
+    final ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, 
instanceName);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new 
ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Routing provider is a spectator in Helix. Currentstate based RP listens 
on all the
+    // currentstate changes of all the clusters. They are a source of leaking 
of watch in
+    // Zookeeper server.
+    ClusterSpectatorManager rpManager  = new ClusterSpectatorManager(ZK_ADDR, 
clusterName, "router");
+    rpManager.syncStart();
+    RoutingTableProvider rp = new RoutingTableProvider(rpManager, 
PropertyType.CURRENTSTATES);
+
+    Thread.sleep(5000);
+
+    // expire RoutingProvider would create dangling CB
+    LOG.info("expire rp manager session:", rpManager.getSessionId());
+    ZkTestHelper.expireSession(rpManager.getZkClient());
+    LOG.info("rp manager new session:", rpManager.getSessionId());
+
+    Thread.sleep(5000);
+
+    MockParticipantManager participantToExpire = participants[0];
+    String oldSessionId = participantToExpire.getSessionId();
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
+
+    // expire participant session; leaked callback handler used to be not 
reset() and be removed from ZkClient
+    LOG.info(
+        "Expire participant: " + participantToExpire.getInstanceName() + ", 
session: "
+            + participantToExpire.getSessionId());
+    ZkTestHelper.expireSession(participantToExpire.getZkClient());
+    String newSessionId = participantToExpire.getSessionId();
+    LOG.info(participantToExpire.getInstanceName() + " oldSessionId: " + 
oldSessionId
+        + ", newSessionId: " + newSessionId);
+
+    Thread.sleep(5000);
+    Map<String, Set<IZkChildListener>> childListeners = 
ZkTestHelper.getZkChildListener(rpManager.getZkClient());
+    for (String path : childListeners.keySet()) {
+      Assert.assertTrue(childListeners.get(path).size() <= 1);
+    }
+
+    Map<String, List<String>> rpWatchPaths = 
ZkTestHelper.getZkWatch(rpManager.getZkClient());
+    List<String> existWatches = rpWatchPaths.get("existWatches");
+    Assert.assertTrue(existWatches.isEmpty());
+  }
+
+  @Test
+  public void testCurrentStatePathLeakingByAsycRemoval() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 3;
+    final String zkAddr = ZK_ADDR;
+    final int mJobUpdateCnt = 500;
+
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB", 
1, // resource
+        32, // partitions
+        n, // nodes
+        2, // replicas
+        "MasterSlave", true);
+
+    final ClusterControllerManager controller =
+        new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+    controller.syncStart();
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(zkAddr, clusterName, 
instanceName);
+      participants[i].syncStart();
+    }
+
+    Boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new 
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // HelixManager rpManager  = 
HelixManagerFactory.getZKHelixManager(clusterName, "", InstanceType.SPECTATOR, 
ZK_ADDR);

Review comment:
       Could you remove the unused code instead of commenting out? 

##########
File path: 
helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
##########
@@ -317,6 +324,150 @@ public boolean verify() throws Exception {
 
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }
+  @Test
+  public void testDanglingCallbackHanlderFix() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 3;
+    final String zkAddr = ZK_ADDR;
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB", 
1, // resource
+        32, // partitions
+        n, // nodes
+        2, // replicas
+        "MasterSlave", true);
+
+    final ClusterControllerManager controller =
+        new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+    controller.syncStart();
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(zkAddr, clusterName, 
instanceName);
+      participants[i].syncStart();
+    }
+
+    Boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new 
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+    Assert.assertTrue(result);
+
+    //HelixManager rpManager  = HelixManagerFactory
+    //    .getZKHelixManager(clusterName, "", InstanceType.SPECTATOR, ZK_ADDR);
+    //rpManager.connect();
+    ClusterSpectatorManager rpManager  = new ClusterSpectatorManager(ZK_ADDR, 
clusterName, "router");
+    rpManager.syncStart();
+    RoutingTableProvider rp = new RoutingTableProvider(rpManager, 
PropertyType.CURRENTSTATES);
+
+    Thread.sleep(5000);

Review comment:
       " all these verification does not guarantee you anything" What kind of 
verification did you mean? You meant verifier? But how do you guarantee 
sleeping 5 secs would be 100% correct: it would return expected result?

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/ChildrenSubscribeResult.java
##########
@@ -0,0 +1,54 @@
+package org.apache.helix.zookeeper.api.client;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+
+
+/** Represents return type of {@link 
org.apache.helix.zookeeper.api.client.RealmAwareZkClient#subscribeChildChanges(String,
 IZkChildListener, boolean)}
+ *  The returned value would signal if watch installation to ZooKeeper server 
succeeded
+ *  or not using field _isInstalled. The _children field would contains the 
list of child names
+ *  of the watched path. It would be null if the parent path does not exist at 
the time of watch
+ *  installation.
+ */
+
+public class ChildrenSubscribeResult {
+  private final List<String> _children;
+  private final boolean _isInstalled;
+
+  public ChildrenSubscribeResult(List<String> children, boolean isInstalled) {
+    if (children != null) {
+      _children = Collections.unmodifiableList(children);
+    } else {
+      _children = null;
+    }
+    _isInstalled = isInstalled;
+  }
+
+  public List<String> getChildren() {
+    return _children;
+  }
+
+  public boolean isInstalled() {

Review comment:
       I still think `isSubscribe()` is more consistent with the concepts in 
`subscribeChildChange()` and easier to understand. Someone would wonder what is 
installed?

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1865,17 +1955,24 @@ public Object call() throws Exception {
    *         exist.
    */
   public List<String> watchForChilds(final String path) {
+    return watchForChilds(path, false);
+  }
+
+  private List<String> watchForChilds(final String path, boolean 
skipWatchingNodeNotExist) {
     if (_zookeeperEventThread != null && Thread.currentThread() == 
_zookeeperEventThread) {
       throw new IllegalArgumentException("Must not be done in the zookeeper 
event thread.");
     }
     return retryUntilConnected(new Callable<List<String>>() {
       @Override
       public List<String> call() throws Exception {
-        exists(path, true);
+        if (!skipWatchingNodeNotExist) {
+          exists(path, true);
+        }
         try {
           return getChildren(path, true);
         } catch (ZkNoNodeException e) {
           // ignore, the "exists" watch will listen for the parent node to 
appear
+          LOG.info("watchForChilds path not existing:" + path + " 
skipWatchingNodeNoteExist:" + skipWatchingNodeNotExist);

Review comment:
       parameterized logging.

##########
File path: 
helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
##########
@@ -318,6 +325,151 @@ public boolean verify() throws Exception {
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }
 
+  @Test
+  public void testDanglingCallbackHanlderFix() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 3;
+
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, "localhost", 
"TestDB", 1, // resource
+        32, // partitions
+        n, // nodes
+        2, // replicas
+        "MasterSlave", true);
+
+    final ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, 
instanceName);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new 
ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Routing provider is a spectator in Helix. Currentstate based RP listens 
on all the
+    // currentstate changes of all the clusters. They are a source of leaking 
of watch in
+    // Zookeeper server.
+    ClusterSpectatorManager rpManager  = new ClusterSpectatorManager(ZK_ADDR, 
clusterName, "router");
+    rpManager.syncStart();
+    RoutingTableProvider rp = new RoutingTableProvider(rpManager, 
PropertyType.CURRENTSTATES);
+
+    Thread.sleep(5000);
+
+    // expire RoutingProvider would create dangling CB
+    LOG.info("expire rp manager session:", rpManager.getSessionId());
+    ZkTestHelper.expireSession(rpManager.getZkClient());
+    LOG.info("rp manager new session:", rpManager.getSessionId());
+
+    Thread.sleep(5000);
+
+    MockParticipantManager participantToExpire = participants[0];
+    String oldSessionId = participantToExpire.getSessionId();
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
+
+    // expire participant session; leaked callback handler used to be not 
reset() and be removed from ZkClient
+    LOG.info(
+        "Expire participant: " + participantToExpire.getInstanceName() + ", 
session: "
+            + participantToExpire.getSessionId());
+    ZkTestHelper.expireSession(participantToExpire.getZkClient());
+    String newSessionId = participantToExpire.getSessionId();
+    LOG.info(participantToExpire.getInstanceName() + " oldSessionId: " + 
oldSessionId
+        + ", newSessionId: " + newSessionId);
+
+    Thread.sleep(5000);
+    Map<String, Set<IZkChildListener>> childListeners = 
ZkTestHelper.getZkChildListener(rpManager.getZkClient());
+    for (String path : childListeners.keySet()) {
+      Assert.assertTrue(childListeners.get(path).size() <= 1);
+    }
+
+    Map<String, List<String>> rpWatchPaths = 
ZkTestHelper.getZkWatch(rpManager.getZkClient());
+    List<String> existWatches = rpWatchPaths.get("existWatches");
+    Assert.assertTrue(existWatches.isEmpty());
+  }
+
+  @Test
+  public void testCurrentStatePathLeakingByAsycRemoval() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 3;
+    final String zkAddr = ZK_ADDR;
+    final int mJobUpdateCnt = 500;
+
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB", 
1, // resource
+        32, // partitions
+        n, // nodes
+        2, // replicas
+        "MasterSlave", true);
+
+    final ClusterControllerManager controller =
+        new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+    controller.syncStart();
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(zkAddr, clusterName, 
instanceName);
+      participants[i].syncStart();
+    }
+
+    Boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new 
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // HelixManager rpManager  = 
HelixManagerFactory.getZKHelixManager(clusterName, "", InstanceType.SPECTATOR, 
ZK_ADDR);
+    // rpManager.connect();
+    ClusterSpectatorManager rpManager  = new ClusterSpectatorManager(ZK_ADDR, 
clusterName, "router");

Review comment:
       Still a good idea to use helix style to format the code

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -555,7 +569,11 @@ private void subscribeDataChange(String path, 
NotificationContext.Type callbackT
         logger.debug(_manager.getInstanceName() + " subscribe data-change. 
path: " + path
             + ", listener: " + _listener);
       }
-      _zkClient.subscribeDataChanges(path, this);
+      boolean rt = _zkClient.subscribeDataChanges(path, this, callbackType != 
Type.INIT);
+      logger.debug("CallbackHandler {} subscribe data path {} result {}", 
this, path, rt);
+      if (!rt) {
+        logger.info("CallbackHandler {} subscribe data path {} failed!", this, 
path);

Review comment:
       I am wondering if info or warn level for this message. If it may 
potentially cause problems maybe warn is expected. But if just regular message, 
info is good enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to