kaisun2000 commented on a change in pull request #1035:
URL: https://github.com/apache/helix/pull/1035#discussion_r435639531
##########
File path:
helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
##########
@@ -318,6 +325,151 @@ public boolean verify() throws Exception {
System.out.println("END " + clusterName + " at " + new
Date(System.currentTimeMillis()));
}
+ @Test
+ public void testDanglingCallbackHanlderFix() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 3;
+
+ System.out.println("START " + clusterName + " at " + new
Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, "localhost",
"TestDB", 1, // resource
+ 32, // partitions
+ n, // nodes
+ 2, // replicas
+ "MasterSlave", true);
+
+ final ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
+ MockParticipantManager[] participants = new MockParticipantManager[n];
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName,
instanceName);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new
ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Routing provider is a spectator in Helix. Currentstate based RP listens
on all the
+ // currentstate changes of all the clusters. They are a source of leaking
of watch in
+ // Zookeeper server.
+ ClusterSpectatorManager rpManager = new ClusterSpectatorManager(ZK_ADDR,
clusterName, "router");
+ rpManager.syncStart();
+ RoutingTableProvider rp = new RoutingTableProvider(rpManager,
PropertyType.CURRENTSTATES);
+
+ Thread.sleep(5000);
+
+ // expire RoutingProvider would create dangling CB
+ LOG.info("expire rp manager session:", rpManager.getSessionId());
+ ZkTestHelper.expireSession(rpManager.getZkClient());
+ LOG.info("rp manager new session:", rpManager.getSessionId());
+
+ Thread.sleep(5000);
+
+ MockParticipantManager participantToExpire = participants[0];
+ String oldSessionId = participantToExpire.getSessionId();
+ PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
+
+ // expire participant session; leaked callback handler used to be not
reset() and be removed from ZkClient
+ LOG.info(
+ "Expire participant: " + participantToExpire.getInstanceName() + ",
session: "
+ + participantToExpire.getSessionId());
+ ZkTestHelper.expireSession(participantToExpire.getZkClient());
+ String newSessionId = participantToExpire.getSessionId();
+ LOG.info(participantToExpire.getInstanceName() + " oldSessionId: " +
oldSessionId
+ + ", newSessionId: " + newSessionId);
+
+ Thread.sleep(5000);
+ Map<String, Set<IZkChildListener>> childListeners =
ZkTestHelper.getZkChildListener(rpManager.getZkClient());
+ for (String path : childListeners.keySet()) {
+ Assert.assertTrue(childListeners.get(path).size() <= 1);
+ }
+
+ Map<String, List<String>> rpWatchPaths =
ZkTestHelper.getZkWatch(rpManager.getZkClient());
+ List<String> existWatches = rpWatchPaths.get("existWatches");
+ Assert.assertTrue(existWatches.isEmpty());
+ }
+
+ @Test
+ public void testCurrentStatePathLeakingByAsycRemoval() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 3;
+ final String zkAddr = ZK_ADDR;
+ final int mJobUpdateCnt = 500;
+
+ System.out.println("START " + clusterName + " at " + new
Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB",
1, // resource
+ 32, // partitions
+ n, // nodes
+ 2, // replicas
+ "MasterSlave", true);
+
+ final ClusterControllerManager controller =
+ new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+ controller.syncStart();
+
+ MockParticipantManager[] participants = new MockParticipantManager[n];
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ participants[i] = new MockParticipantManager(zkAddr, clusterName,
instanceName);
+ participants[i].syncStart();
+ }
+
+ Boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // HelixManager rpManager =
HelixManagerFactory.getZKHelixManager(clusterName, "", InstanceType.SPECTATOR,
ZK_ADDR);
Review comment:
fixed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]