pkuwm commented on a change in pull request #1035:
URL: https://github.com/apache/helix/pull/1035#discussion_r433629379



##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/ChildrenSubscribeResult.java
##########
@@ -0,0 +1,27 @@
+package org.apache.helix.zookeeper.api.client;
+
+import java.util.Collections;
+import java.util.List;
+
+
+public class ChildrenSubscribeResult {
+  private List<String> _children;
+  private boolean _isInstalled;

Review comment:
       Change it to `isSubscribed`?

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1849,13 +1924,23 @@ private void checkDataSizeLimit(byte[] data) {
   }
 
   public void watchForData(final String path) {
-    retryUntilConnected(new Callable<Object>() {
-      @Override
-      public Object call() throws Exception {
-        getConnection().exists(path, true);
-        return null;
+    watchForData(path, false);
+  }
+
+  private boolean watchForData(final String path, boolean 
skipWatchingNodeNotExist) {
+    try {
+      if (skipWatchingNodeNotExist) {
+        Stat stat = new Stat();

Review comment:
       @kaisun2000 `null` is acceptable for the param `stat`. It would not 
throw exception. See 
[here](https://github.com/apache/zookeeper/blob/b0aaa7de5b28375cd34ead8c8fbf14e3ebddd30f/src/java/test/org/apache/zookeeper/test/AuthTest.java#L70)
 So no need to get the stat here for this api.
   ```
       public void testBadAuthThenSendOtherCommands() throws Exception {
           ZooKeeper zk = createClient();     
           try {        
               zk.addAuthInfo("INVALID", "BAR".getBytes());
               zk.exists("/foobar", false);             
               zk.getData("/path1", false, null);
               Assert.fail("Should get auth state error");
   ```

##########
File path: 
helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
##########
@@ -317,6 +324,150 @@ public boolean verify() throws Exception {
 
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }
+  @Test
+  public void testDanglingCallbackHanlderFix() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 3;
+    final String zkAddr = ZK_ADDR;
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB", 
1, // resource
+        32, // partitions
+        n, // nodes
+        2, // replicas
+        "MasterSlave", true);
+
+    final ClusterControllerManager controller =
+        new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+    controller.syncStart();
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(zkAddr, clusterName, 
instanceName);
+      participants[i].syncStart();
+    }
+
+    Boolean result =

Review comment:
       `Boolean` -> `boolean`?
   And style. It doesn't seem this line is formatted.

##########
File path: 
helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
##########
@@ -317,6 +324,150 @@ public boolean verify() throws Exception {
 
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }
+  @Test
+  public void testDanglingCallbackHanlderFix() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 3;
+    final String zkAddr = ZK_ADDR;
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB", 
1, // resource
+        32, // partitions
+        n, // nodes
+        2, // replicas
+        "MasterSlave", true);
+
+    final ClusterControllerManager controller =
+        new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+    controller.syncStart();
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(zkAddr, clusterName, 
instanceName);
+      participants[i].syncStart();
+    }
+
+    Boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new 
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+    Assert.assertTrue(result);
+
+    //HelixManager rpManager  = HelixManagerFactory
+    //    .getZKHelixManager(clusterName, "", InstanceType.SPECTATOR, ZK_ADDR);
+    //rpManager.connect();
+    ClusterSpectatorManager rpManager  = new ClusterSpectatorManager(ZK_ADDR, 
clusterName, "router");
+    rpManager.syncStart();
+    RoutingTableProvider rp = new RoutingTableProvider(rpManager, 
PropertyType.CURRENTSTATES);

Review comment:
       Could we add a comment why constructing this RoutingTableProvider is 
necessary as the variable is not used?

##########
File path: 
helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
##########
@@ -317,6 +324,150 @@ public boolean verify() throws Exception {
 
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }
+  @Test
+  public void testDanglingCallbackHanlderFix() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 3;
+    final String zkAddr = ZK_ADDR;
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB", 
1, // resource
+        32, // partitions
+        n, // nodes
+        2, // replicas
+        "MasterSlave", true);
+
+    final ClusterControllerManager controller =
+        new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+    controller.syncStart();
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(zkAddr, clusterName, 
instanceName);
+      participants[i].syncStart();
+    }
+
+    Boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new 
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+    Assert.assertTrue(result);
+
+    //HelixManager rpManager  = HelixManagerFactory
+    //    .getZKHelixManager(clusterName, "", InstanceType.SPECTATOR, ZK_ADDR);
+    //rpManager.connect();
+    ClusterSpectatorManager rpManager  = new ClusterSpectatorManager(ZK_ADDR, 
clusterName, "router");
+    rpManager.syncStart();
+    RoutingTableProvider rp = new RoutingTableProvider(rpManager, 
PropertyType.CURRENTSTATES);
+
+    Thread.sleep(5000);
+
+    // expire RoutingProvider would create dangling CB
+    LOG.info("expire rp manager session:", rpManager.getSessionId());
+    ZkTestHelper.expireSession(rpManager.getZkClient());
+    LOG.info("rp manager new session:", rpManager.getSessionId());
+
+    Thread.sleep(5000);
+
+    MockParticipantManager participantToExpire = participants[0];
+    String oldSessionId = participantToExpire.getSessionId();
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
+
+    // expire participant session; leaked callback handler used to be not 
reset() and be removed from ZkClient
+    LOG.info(
+        "Expire participant: " + participantToExpire.getInstanceName() + ", 
session: "
+            + participantToExpire.getSessionId());
+    ZkTestHelper.expireSession(participantToExpire.getZkClient());
+    String newSessionId = participantToExpire.getSessionId();
+    LOG.info(participantToExpire.getInstanceName() + " oldSessionId: " + 
oldSessionId
+        + ", newSessionId: " + newSessionId);
+
+    Thread.sleep(5000);
+    Map<String, Set<IZkChildListener>> childListeners = 
ZkTestHelper.getZkChildListener(rpManager.getZkClient());
+    for (String path : childListeners.keySet()) {
+      Assert.assertTrue(childListeners.get(path).size() <= 1);
+    }
+
+    Map<String, List<String>> rpWatchPaths = 
ZkTestHelper.getZkWatch(rpManager.getZkClient());
+    List<String> existWatches = rpWatchPaths.get("existWatches");
+    Assert.assertTrue(existWatches.isEmpty());
+  }
+
+  @Test
+  public void testCurrentStatePathLeakingByAsycRemoval() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 3;
+    final String zkAddr = ZK_ADDR;
+    final int mJobUpdateCnt = 500;
+
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB", 
1, // resource
+        32, // partitions
+        n, // nodes
+        2, // replicas
+        "MasterSlave", true);
+
+    final ClusterControllerManager controller =
+        new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+    controller.syncStart();
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(zkAddr, clusterName, 
instanceName);
+      participants[i].syncStart();
+    }
+
+    Boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new 
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // HelixManager rpManager  = 
HelixManagerFactory.getZKHelixManager(clusterName, "", InstanceType.SPECTATOR, 
ZK_ADDR);
+    // rpManager.connect();
+    ClusterSpectatorManager rpManager  = new ClusterSpectatorManager(ZK_ADDR, 
clusterName, "router");
+    rpManager.syncStart();
+    RoutingTableProvider rp = new RoutingTableProvider(rpManager, 
PropertyType.CURRENTSTATES);
+
+    LOG.info("add job");
+    MockParticipantManager jobParticipant = participants[0];
+    String jobSessionId = jobParticipant.getSessionId();
+    HelixDataAccessor jobAccesor = jobParticipant.getHelixDataAccessor();
+    PropertyKey.Builder jobKeyBuilder = new PropertyKey.Builder(clusterName);
+    PropertyKey db0key = 
jobKeyBuilder.currentState(jobParticipant.getInstanceName(), jobSessionId, 
"TestDB0");
+    CurrentState db0 = jobAccesor.getProperty(db0key);
+    PropertyKey jobKey = 
jobKeyBuilder.currentState(jobParticipant.getInstanceName(), jobSessionId, 
"BackupQueue");
+    CurrentState cs = new CurrentState("BackupQueue");
+    cs.setSessionId(jobSessionId);
+    cs.setStateModelDefRef(db0.getStateModelDefRef());
+
+    LOG.info("add job");
+    boolean rtJob = jobAccesor.setProperty(jobKey, cs);

Review comment:
       Is this line redundant?

##########
File path: 
helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
##########
@@ -317,6 +324,150 @@ public boolean verify() throws Exception {
 
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }
+  @Test
+  public void testDanglingCallbackHanlderFix() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 3;
+    final String zkAddr = ZK_ADDR;
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB", 
1, // resource
+        32, // partitions
+        n, // nodes
+        2, // replicas
+        "MasterSlave", true);
+
+    final ClusterControllerManager controller =
+        new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+    controller.syncStart();
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(zkAddr, clusterName, 
instanceName);
+      participants[i].syncStart();
+    }
+
+    Boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new 
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+    Assert.assertTrue(result);
+
+    //HelixManager rpManager  = HelixManagerFactory
+    //    .getZKHelixManager(clusterName, "", InstanceType.SPECTATOR, ZK_ADDR);
+    //rpManager.connect();
+    ClusterSpectatorManager rpManager  = new ClusterSpectatorManager(ZK_ADDR, 
clusterName, "router");
+    rpManager.syncStart();
+    RoutingTableProvider rp = new RoutingTableProvider(rpManager, 
PropertyType.CURRENTSTATES);
+
+    Thread.sleep(5000);
+
+    // expire RoutingProvider would create dangling CB

Review comment:
       You have a fix, would expiring helix manager would still create dangling 
CB?

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1270,6 +1330,9 @@ public void deleteRecursively(String path) throws 
ZkClientException {
   private void processDataOrChildChange(WatchedEvent event, long 
notificationTime) {
     final String path = event.getPath();
     final boolean pathExists = event.getType() != EventType.NodeDeleted;
+    if (EventType.NodeDeleted == event.getType()) {
+      LOG.debug("event delelete: {}", event.getPath());

Review comment:
       Typo: `delelete` -> `delete`
   And also may be a good idea to make it clearer? "Event NodeDeleted, path: {}"

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -748,6 +756,10 @@ public void handleChildChange(String parentPath, 
List<String> currentChilds) {
           // removeListener will call handler.reset(), which in turn call 
invoke() on FINALIZE type
           _manager.removeListener(_propertyKey, _listener);
         } else {
+          if (!isReady()) {
+            // avoid leaking CallbackHandler

Review comment:
       Maybe could we add a log here for such case?

##########
File path: 
helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
##########
@@ -317,6 +324,150 @@ public boolean verify() throws Exception {
 
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }
+  @Test

Review comment:
       Could we add an empty line before the new test method? And add some 
comments what this test does?

##########
File path: 
helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
##########
@@ -317,6 +324,150 @@ public boolean verify() throws Exception {
 
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }
+  @Test
+  public void testDanglingCallbackHanlderFix() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 3;
+    final String zkAddr = ZK_ADDR;
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB", 
1, // resource
+        32, // partitions
+        n, // nodes
+        2, // replicas
+        "MasterSlave", true);
+
+    final ClusterControllerManager controller =
+        new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+    controller.syncStart();
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(zkAddr, clusterName, 
instanceName);
+      participants[i].syncStart();
+    }
+
+    Boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new 
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+    Assert.assertTrue(result);
+
+    //HelixManager rpManager  = HelixManagerFactory

Review comment:
       If we don't need these lines, we remove them, instead of commenting out.

##########
File path: 
helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
##########
@@ -317,6 +324,150 @@ public boolean verify() throws Exception {
 
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }
+  @Test
+  public void testDanglingCallbackHanlderFix() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 3;
+    final String zkAddr = ZK_ADDR;

Review comment:
       No need to use this extra variable `zkAddr`? We don't want to over use 
variables.

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1054,15 +1082,47 @@ public Stat getStat(final String path) {
   }
 
   private Stat getStat(final String path, final boolean watch) {
+    return getStat(path, watch, false);
+  }
+
+  /*
+   * Install watch if there is such node and return the stat
+   *
+   * If useGetData false, use exist(). @param watch would determine if adding 
watch
+   * to ZooKeeper server or not.
+   * Note, if @param path does not exist in ZooKeeper server, watch would 
still be installed
+   * if @param watch is true.
+   *
+   * If useGetData true, use getData() to add watch. ignore @param watch in 
this case.
+   * Note, if @param path does not exist in ZooKeeper server, no watch would 
be added.
+   */
+  private Stat getStat(final String path, final boolean watch, final boolean 
useGetData) {
     long startT = System.currentTimeMillis();
+    Stat stat = null;
     try {
-      Stat stat = retryUntilConnected(
-          () -> ((ZkConnection) getConnection()).getZookeeper().exists(path, 
watch));
+      if (!useGetData) {
+        stat = retryUntilConnected(
+            () -> ((ZkConnection) getConnection()).getZookeeper().exists(path, 
watch));
+      } else {
+        stat = new Stat();
+        try {
+          LOG.debug("getstat() invoked with useGetData() with path: " + path);
+          Stat finalStat = stat;
+          retryUntilConnected(() -> ((ZkConnection) 
getConnection()).getZookeeper().getData(path, true, finalStat));
+        } catch (ZkNoNodeException e) {
+          LOG.debug("getstat() invoked path: " + path + " null" + " 
useGetData:" + useGetData);

Review comment:
       I meant parameterized logging for all logging you are adding. Here this 
is a debug level, usually we would only enable info or warn level, but this 
string is still concatenated. But if you use parameterized logging, at 
info/warn level, we could eliminate such string concatenation overhead, right? 
I strongly suggest changing all loggings you are adding.

##########
File path: 
helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
##########
@@ -317,6 +324,150 @@ public boolean verify() throws Exception {
 
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }
+  @Test
+  public void testDanglingCallbackHanlderFix() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 3;
+    final String zkAddr = ZK_ADDR;
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB", 
1, // resource
+        32, // partitions
+        n, // nodes
+        2, // replicas
+        "MasterSlave", true);
+
+    final ClusterControllerManager controller =
+        new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+    controller.syncStart();
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(zkAddr, clusterName, 
instanceName);
+      participants[i].syncStart();
+    }
+
+    Boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new 
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+    Assert.assertTrue(result);
+
+    //HelixManager rpManager  = HelixManagerFactory
+    //    .getZKHelixManager(clusterName, "", InstanceType.SPECTATOR, ZK_ADDR);
+    //rpManager.connect();
+    ClusterSpectatorManager rpManager  = new ClusterSpectatorManager(ZK_ADDR, 
clusterName, "router");
+    rpManager.syncStart();
+    RoutingTableProvider rp = new RoutingTableProvider(rpManager, 
PropertyType.CURRENTSTATES);
+
+    Thread.sleep(5000);

Review comment:
       Why does it need to sleep 5 seconds? Is it deterministic?

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1054,15 +1082,47 @@ public Stat getStat(final String path) {
   }
 
   private Stat getStat(final String path, final boolean watch) {
+    return getStat(path, watch, false);
+  }
+
+  /*
+   * Install watch if there is such node and return the stat
+   *
+   * If useGetData false, use exist(). @param watch would determine if adding 
watch
+   * to ZooKeeper server or not.
+   * Note, if @param path does not exist in ZooKeeper server, watch would 
still be installed
+   * if @param watch is true.
+   *
+   * If useGetData true, use getData() to add watch. ignore @param watch in 
this case.
+   * Note, if @param path does not exist in ZooKeeper server, no watch would 
be added.
+   */
+  private Stat getStat(final String path, final boolean watch, final boolean 
useGetData) {
     long startT = System.currentTimeMillis();
+    Stat stat = null;
     try {
-      Stat stat = retryUntilConnected(
-          () -> ((ZkConnection) getConnection()).getZookeeper().exists(path, 
watch));
+      if (!useGetData) {
+        stat = retryUntilConnected(
+            () -> ((ZkConnection) getConnection()).getZookeeper().exists(path, 
watch));
+      } else {
+        stat = new Stat();
+        try {
+          LOG.debug("getstat() invoked with useGetData() with path: " + path);
+          Stat finalStat = stat;

Review comment:
       @kaisun2000 This is not yet resolved. The reason why Intellij complains 
it is, the variable used in lamda should be final but you initialize `stat` to 
null before try block. So stat is not final.
   
   To avoid the finalStat, you could remove the variable init to null. So 
lamda/intellij won't complain. Then it is fine to remove finalStat. We don't 
over use variables, right? Thanks.

##########
File path: 
helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
##########
@@ -317,6 +324,150 @@ public boolean verify() throws Exception {
 
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }
+  @Test
+  public void testDanglingCallbackHanlderFix() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 3;
+    final String zkAddr = ZK_ADDR;
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB", 
1, // resource
+        32, // partitions
+        n, // nodes
+        2, // replicas
+        "MasterSlave", true);
+
+    final ClusterControllerManager controller =
+        new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+    controller.syncStart();
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(zkAddr, clusterName, 
instanceName);
+      participants[i].syncStart();
+    }
+
+    Boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new 
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+    Assert.assertTrue(result);
+
+    //HelixManager rpManager  = HelixManagerFactory
+    //    .getZKHelixManager(clusterName, "", InstanceType.SPECTATOR, ZK_ADDR);
+    //rpManager.connect();
+    ClusterSpectatorManager rpManager  = new ClusterSpectatorManager(ZK_ADDR, 
clusterName, "router");
+    rpManager.syncStart();
+    RoutingTableProvider rp = new RoutingTableProvider(rpManager, 
PropertyType.CURRENTSTATES);
+
+    Thread.sleep(5000);
+
+    // expire RoutingProvider would create dangling CB
+    LOG.info("expire rp manager session:", rpManager.getSessionId());
+    ZkTestHelper.expireSession(rpManager.getZkClient());
+    LOG.info("rp manager new session:", rpManager.getSessionId());
+
+    Thread.sleep(5000);
+
+    MockParticipantManager participantToExpire = participants[0];
+    String oldSessionId = participantToExpire.getSessionId();
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
+
+    // expire participant session; leaked callback handler used to be not 
reset() and be removed from ZkClient
+    LOG.info(
+        "Expire participant: " + participantToExpire.getInstanceName() + ", 
session: "
+            + participantToExpire.getSessionId());
+    ZkTestHelper.expireSession(participantToExpire.getZkClient());
+    String newSessionId = participantToExpire.getSessionId();
+    LOG.info(participantToExpire.getInstanceName() + " oldSessionId: " + 
oldSessionId
+        + ", newSessionId: " + newSessionId);
+
+    Thread.sleep(5000);
+    Map<String, Set<IZkChildListener>> childListeners = 
ZkTestHelper.getZkChildListener(rpManager.getZkClient());
+    for (String path : childListeners.keySet()) {
+      Assert.assertTrue(childListeners.get(path).size() <= 1);
+    }
+
+    Map<String, List<String>> rpWatchPaths = 
ZkTestHelper.getZkWatch(rpManager.getZkClient());
+    List<String> existWatches = rpWatchPaths.get("existWatches");
+    Assert.assertTrue(existWatches.isEmpty());
+  }
+
+  @Test
+  public void testCurrentStatePathLeakingByAsycRemoval() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 3;
+    final String zkAddr = ZK_ADDR;
+    final int mJobUpdateCnt = 500;
+
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB", 
1, // resource
+        32, // partitions
+        n, // nodes
+        2, // replicas
+        "MasterSlave", true);
+
+    final ClusterControllerManager controller =
+        new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+    controller.syncStart();
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(zkAddr, clusterName, 
instanceName);
+      participants[i].syncStart();
+    }
+
+    Boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new 
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // HelixManager rpManager  = 
HelixManagerFactory.getZKHelixManager(clusterName, "", InstanceType.SPECTATOR, 
ZK_ADDR);
+    // rpManager.connect();
+    ClusterSpectatorManager rpManager  = new ClusterSpectatorManager(ZK_ADDR, 
clusterName, "router");
+    rpManager.syncStart();
+    RoutingTableProvider rp = new RoutingTableProvider(rpManager, 
PropertyType.CURRENTSTATES);
+
+    LOG.info("add job");
+    MockParticipantManager jobParticipant = participants[0];
+    String jobSessionId = jobParticipant.getSessionId();
+    HelixDataAccessor jobAccesor = jobParticipant.getHelixDataAccessor();
+    PropertyKey.Builder jobKeyBuilder = new PropertyKey.Builder(clusterName);
+    PropertyKey db0key = 
jobKeyBuilder.currentState(jobParticipant.getInstanceName(), jobSessionId, 
"TestDB0");
+    CurrentState db0 = jobAccesor.getProperty(db0key);
+    PropertyKey jobKey = 
jobKeyBuilder.currentState(jobParticipant.getInstanceName(), jobSessionId, 
"BackupQueue");
+    CurrentState cs = new CurrentState("BackupQueue");
+    cs.setSessionId(jobSessionId);
+    cs.setStateModelDefRef(db0.getStateModelDefRef());
+
+    LOG.info("add job");
+    boolean rtJob = jobAccesor.setProperty(jobKey, cs);
+    for (int i = 0; i < mJobUpdateCnt; i++) {
+      rtJob = jobAccesor.setProperty(jobKey, cs);
+    }
+
+    LOG.info("remove job");
+    rtJob =jobParticipant.getZkClient().delete(jobKey.getPath());

Review comment:
       1. Your test method is `AsyncRemoval`. Is this async?
   2. Shall we also verify the watches before removal, to make sure the watches 
are really added and removed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to