kaisun2000 commented on a change in pull request #1035:
URL: https://github.com/apache/helix/pull/1035#discussion_r435001676
##########
File path:
helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
##########
@@ -317,6 +324,150 @@ public boolean verify() throws Exception {
System.out.println("END " + clusterName + " at " + new
Date(System.currentTimeMillis()));
}
+ @Test
+ public void testDanglingCallbackHanlderFix() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 3;
+ final String zkAddr = ZK_ADDR;
+ System.out.println("START " + clusterName + " at " + new
Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB",
1, // resource
+ 32, // partitions
+ n, // nodes
+ 2, // replicas
+ "MasterSlave", true);
+
+ final ClusterControllerManager controller =
+ new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+ controller.syncStart();
+
+ MockParticipantManager[] participants = new MockParticipantManager[n];
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ participants[i] = new MockParticipantManager(zkAddr, clusterName,
instanceName);
+ participants[i].syncStart();
+ }
+
+ Boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+ clusterName));
+ Assert.assertTrue(result);
+
+ //HelixManager rpManager = HelixManagerFactory
+ // .getZKHelixManager(clusterName, "", InstanceType.SPECTATOR, ZK_ADDR);
+ //rpManager.connect();
+ ClusterSpectatorManager rpManager = new ClusterSpectatorManager(ZK_ADDR,
clusterName, "router");
+ rpManager.syncStart();
+ RoutingTableProvider rp = new RoutingTableProvider(rpManager,
PropertyType.CURRENTSTATES);
+
+ Thread.sleep(5000);
+
+ // expire RoutingProvider would create dangling CB
+ LOG.info("expire rp manager session:", rpManager.getSessionId());
+ ZkTestHelper.expireSession(rpManager.getZkClient());
+ LOG.info("rp manager new session:", rpManager.getSessionId());
+
+ Thread.sleep(5000);
+
+ MockParticipantManager participantToExpire = participants[0];
+ String oldSessionId = participantToExpire.getSessionId();
+ PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
+
+ // expire participant session; leaked callback handler used to be not
reset() and be removed from ZkClient
+ LOG.info(
+ "Expire participant: " + participantToExpire.getInstanceName() + ",
session: "
+ + participantToExpire.getSessionId());
+ ZkTestHelper.expireSession(participantToExpire.getZkClient());
+ String newSessionId = participantToExpire.getSessionId();
+ LOG.info(participantToExpire.getInstanceName() + " oldSessionId: " +
oldSessionId
+ + ", newSessionId: " + newSessionId);
+
+ Thread.sleep(5000);
+ Map<String, Set<IZkChildListener>> childListeners =
ZkTestHelper.getZkChildListener(rpManager.getZkClient());
+ for (String path : childListeners.keySet()) {
+ Assert.assertTrue(childListeners.get(path).size() <= 1);
+ }
+
+ Map<String, List<String>> rpWatchPaths =
ZkTestHelper.getZkWatch(rpManager.getZkClient());
+ List<String> existWatches = rpWatchPaths.get("existWatches");
+ Assert.assertTrue(existWatches.isEmpty());
+ }
+
+ @Test
+ public void testCurrentStatePathLeakingByAsycRemoval() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 3;
+ final String zkAddr = ZK_ADDR;
+ final int mJobUpdateCnt = 500;
+
+ System.out.println("START " + clusterName + " at " + new
Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB",
1, // resource
+ 32, // partitions
+ n, // nodes
+ 2, // replicas
+ "MasterSlave", true);
+
+ final ClusterControllerManager controller =
+ new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+ controller.syncStart();
+
+ MockParticipantManager[] participants = new MockParticipantManager[n];
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ participants[i] = new MockParticipantManager(zkAddr, clusterName,
instanceName);
+ participants[i].syncStart();
+ }
+
+ Boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // HelixManager rpManager =
HelixManagerFactory.getZKHelixManager(clusterName, "", InstanceType.SPECTATOR,
ZK_ADDR);
+ // rpManager.connect();
+ ClusterSpectatorManager rpManager = new ClusterSpectatorManager(ZK_ADDR,
clusterName, "router");
+ rpManager.syncStart();
+ RoutingTableProvider rp = new RoutingTableProvider(rpManager,
PropertyType.CURRENTSTATES);
+
+ LOG.info("add job");
+ MockParticipantManager jobParticipant = participants[0];
+ String jobSessionId = jobParticipant.getSessionId();
+ HelixDataAccessor jobAccesor = jobParticipant.getHelixDataAccessor();
+ PropertyKey.Builder jobKeyBuilder = new PropertyKey.Builder(clusterName);
+ PropertyKey db0key =
jobKeyBuilder.currentState(jobParticipant.getInstanceName(), jobSessionId,
"TestDB0");
+ CurrentState db0 = jobAccesor.getProperty(db0key);
+ PropertyKey jobKey =
jobKeyBuilder.currentState(jobParticipant.getInstanceName(), jobSessionId,
"BackupQueue");
+ CurrentState cs = new CurrentState("BackupQueue");
+ cs.setSessionId(jobSessionId);
+ cs.setStateModelDefRef(db0.getStateModelDefRef());
+
+ LOG.info("add job");
+ boolean rtJob = jobAccesor.setProperty(jobKey, cs);
+ for (int i = 0; i < mJobUpdateCnt; i++) {
+ rtJob = jobAccesor.setProperty(jobKey, cs);
+ }
+
+ LOG.info("remove job");
+ rtJob =jobParticipant.getZkClient().delete(jobKey.getPath());
Review comment:
1/ This is async to the thread of routing provider. You can refer to my
investigation doc case 2 to see why this is async. Without my fix, there are at
least 3 possibilities of leaking.
2/ No need. If the watch is not installed, RP won't work. This is not
purpose of this test.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]