pkuwm commented on a change in pull request #1035:
URL: https://github.com/apache/helix/pull/1035#discussion_r435415302
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1054,15 +1082,46 @@ public Stat getStat(final String path) {
}
private Stat getStat(final String path, final boolean watch) {
+ return getStat(path, watch, false);
+ }
+
+ /*
+ * Install watch if there is such node and return the stat
+ *
+ * If useGetData false, use exist(). @param watch would determine if adding
watch
+ * to ZooKeeper server or not.
+ * Note, if @param path does not exist in ZooKeeper server, watch would
still be installed
+ * if @param watch is true.
+ *
+ * If useGetData true, use getData() to add watch. ignore @param watch in
this case.
+ * Note, if @param path does not exist in ZooKeeper server, no watch would
be added.
+ */
+ private Stat getStat(final String path, final boolean watch, final boolean
useGetData) {
long startT = System.currentTimeMillis();
+ final Stat stat;
try {
- Stat stat = retryUntilConnected(
- () -> ((ZkConnection) getConnection()).getZookeeper().exists(path,
watch));
+ if (!useGetData) {
+ stat = retryUntilConnected(
+ () -> ((ZkConnection) getConnection()).getZookeeper().exists(path,
watch));
+ } else {
+ stat = new Stat();
+ try {
+ LOG.debug("getstat() invoked with useGetData() with path: {} ",
path);
+ retryUntilConnected(() -> ((ZkConnection)
getConnection()).getZookeeper().getData(path, true, stat));
+ } catch (ZkNoNodeException e) {
+ LOG.debug("getstat() invoked path: {} null useGetData: {}", path,
useGetData);
Review comment:
What is this `null` representing? :)
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -555,7 +569,11 @@ private void subscribeDataChange(String path,
NotificationContext.Type callbackT
logger.debug(_manager.getInstanceName() + " subscribe data-change.
path: " + path
+ ", listener: " + _listener);
}
- _zkClient.subscribeDataChanges(path, this);
+ boolean rt = _zkClient.subscribeDataChanges(path, this, callbackType !=
Type.INIT);
Review comment:
Could have been better to use java style naming? A clearer variable name
would be appreciated.
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/ChildrenSubscribeResult.java
##########
@@ -0,0 +1,54 @@
+package org.apache.helix.zookeeper.api.client;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+
+
+/** Represents return type of {@link
org.apache.helix.zookeeper.api.client.RealmAwareZkClient#subscribeChildChanges(String,
IZkChildListener, boolean)}
+ * The returned value would signal if watch installation to ZooKeeper server
succeeded
+ * or not using field _isInstalled. The _children field would contains the
list of child names
+ * of the watched path. It would be null if the parent path does not exist at
the time of watch
+ * installation.
+ */
+
Review comment:
Not trying to be buggy, but usually no empty space between comments and
class signature.
##########
File path:
helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
##########
@@ -318,6 +325,151 @@ public boolean verify() throws Exception {
System.out.println("END " + clusterName + " at " + new
Date(System.currentTimeMillis()));
}
+ @Test
+ public void testDanglingCallbackHanlderFix() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 3;
+
+ System.out.println("START " + clusterName + " at " + new
Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, "localhost",
"TestDB", 1, // resource
+ 32, // partitions
+ n, // nodes
+ 2, // replicas
+ "MasterSlave", true);
+
+ final ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
+ MockParticipantManager[] participants = new MockParticipantManager[n];
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName,
instanceName);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new
ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Routing provider is a spectator in Helix. Currentstate based RP listens
on all the
+ // currentstate changes of all the clusters. They are a source of leaking
of watch in
+ // Zookeeper server.
+ ClusterSpectatorManager rpManager = new ClusterSpectatorManager(ZK_ADDR,
clusterName, "router");
+ rpManager.syncStart();
+ RoutingTableProvider rp = new RoutingTableProvider(rpManager,
PropertyType.CURRENTSTATES);
+
+ Thread.sleep(5000);
+
+ // expire RoutingProvider would create dangling CB
+ LOG.info("expire rp manager session:", rpManager.getSessionId());
+ ZkTestHelper.expireSession(rpManager.getZkClient());
+ LOG.info("rp manager new session:", rpManager.getSessionId());
+
+ Thread.sleep(5000);
+
+ MockParticipantManager participantToExpire = participants[0];
+ String oldSessionId = participantToExpire.getSessionId();
+ PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
+
+ // expire participant session; leaked callback handler used to be not
reset() and be removed from ZkClient
+ LOG.info(
+ "Expire participant: " + participantToExpire.getInstanceName() + ",
session: "
+ + participantToExpire.getSessionId());
+ ZkTestHelper.expireSession(participantToExpire.getZkClient());
+ String newSessionId = participantToExpire.getSessionId();
+ LOG.info(participantToExpire.getInstanceName() + " oldSessionId: " +
oldSessionId
+ + ", newSessionId: " + newSessionId);
+
+ Thread.sleep(5000);
+ Map<String, Set<IZkChildListener>> childListeners =
ZkTestHelper.getZkChildListener(rpManager.getZkClient());
+ for (String path : childListeners.keySet()) {
+ Assert.assertTrue(childListeners.get(path).size() <= 1);
+ }
+
+ Map<String, List<String>> rpWatchPaths =
ZkTestHelper.getZkWatch(rpManager.getZkClient());
+ List<String> existWatches = rpWatchPaths.get("existWatches");
+ Assert.assertTrue(existWatches.isEmpty());
+ }
+
+ @Test
+ public void testCurrentStatePathLeakingByAsycRemoval() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 3;
+ final String zkAddr = ZK_ADDR;
+ final int mJobUpdateCnt = 500;
+
+ System.out.println("START " + clusterName + " at " + new
Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB",
1, // resource
+ 32, // partitions
+ n, // nodes
+ 2, // replicas
+ "MasterSlave", true);
+
+ final ClusterControllerManager controller =
+ new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+ controller.syncStart();
+
+ MockParticipantManager[] participants = new MockParticipantManager[n];
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ participants[i] = new MockParticipantManager(zkAddr, clusterName,
instanceName);
+ participants[i].syncStart();
+ }
+
+ Boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // HelixManager rpManager =
HelixManagerFactory.getZKHelixManager(clusterName, "", InstanceType.SPECTATOR,
ZK_ADDR);
Review comment:
Could you remove the unused code instead of commenting out?
##########
File path:
helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
##########
@@ -317,6 +324,150 @@ public boolean verify() throws Exception {
System.out.println("END " + clusterName + " at " + new
Date(System.currentTimeMillis()));
}
+ @Test
+ public void testDanglingCallbackHanlderFix() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 3;
+ final String zkAddr = ZK_ADDR;
+ System.out.println("START " + clusterName + " at " + new
Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB",
1, // resource
+ 32, // partitions
+ n, // nodes
+ 2, // replicas
+ "MasterSlave", true);
+
+ final ClusterControllerManager controller =
+ new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+ controller.syncStart();
+
+ MockParticipantManager[] participants = new MockParticipantManager[n];
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ participants[i] = new MockParticipantManager(zkAddr, clusterName,
instanceName);
+ participants[i].syncStart();
+ }
+
+ Boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+ clusterName));
+ Assert.assertTrue(result);
+
+ //HelixManager rpManager = HelixManagerFactory
+ // .getZKHelixManager(clusterName, "", InstanceType.SPECTATOR, ZK_ADDR);
+ //rpManager.connect();
+ ClusterSpectatorManager rpManager = new ClusterSpectatorManager(ZK_ADDR,
clusterName, "router");
+ rpManager.syncStart();
+ RoutingTableProvider rp = new RoutingTableProvider(rpManager,
PropertyType.CURRENTSTATES);
+
+ Thread.sleep(5000);
Review comment:
" all these verification does not guarantee you anything" What kind of
verification did you mean? You meant verifier? But how do you guarantee
sleeping 5 secs would be 100% correct: it would return expected result?
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/ChildrenSubscribeResult.java
##########
@@ -0,0 +1,54 @@
+package org.apache.helix.zookeeper.api.client;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+
+
+/** Represents return type of {@link
org.apache.helix.zookeeper.api.client.RealmAwareZkClient#subscribeChildChanges(String,
IZkChildListener, boolean)}
+ * The returned value would signal if watch installation to ZooKeeper server
succeeded
+ * or not using field _isInstalled. The _children field would contains the
list of child names
+ * of the watched path. It would be null if the parent path does not exist at
the time of watch
+ * installation.
+ */
+
+public class ChildrenSubscribeResult {
+ private final List<String> _children;
+ private final boolean _isInstalled;
+
+ public ChildrenSubscribeResult(List<String> children, boolean isInstalled) {
+ if (children != null) {
+ _children = Collections.unmodifiableList(children);
+ } else {
+ _children = null;
+ }
+ _isInstalled = isInstalled;
+ }
+
+ public List<String> getChildren() {
+ return _children;
+ }
+
+ public boolean isInstalled() {
Review comment:
I still think `isSubscribe()` is more consistent with the concepts in
`subscribeChildChange()` and easier to understand. Someone would wonder what is
installed?
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1865,17 +1955,24 @@ public Object call() throws Exception {
* exist.
*/
public List<String> watchForChilds(final String path) {
+ return watchForChilds(path, false);
+ }
+
+ private List<String> watchForChilds(final String path, boolean
skipWatchingNodeNotExist) {
if (_zookeeperEventThread != null && Thread.currentThread() ==
_zookeeperEventThread) {
throw new IllegalArgumentException("Must not be done in the zookeeper
event thread.");
}
return retryUntilConnected(new Callable<List<String>>() {
@Override
public List<String> call() throws Exception {
- exists(path, true);
+ if (!skipWatchingNodeNotExist) {
+ exists(path, true);
+ }
try {
return getChildren(path, true);
} catch (ZkNoNodeException e) {
// ignore, the "exists" watch will listen for the parent node to
appear
+ LOG.info("watchForChilds path not existing:" + path + "
skipWatchingNodeNoteExist:" + skipWatchingNodeNotExist);
Review comment:
parameterized logging.
##########
File path:
helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
##########
@@ -318,6 +325,151 @@ public boolean verify() throws Exception {
System.out.println("END " + clusterName + " at " + new
Date(System.currentTimeMillis()));
}
+ @Test
+ public void testDanglingCallbackHanlderFix() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 3;
+
+ System.out.println("START " + clusterName + " at " + new
Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, "localhost",
"TestDB", 1, // resource
+ 32, // partitions
+ n, // nodes
+ 2, // replicas
+ "MasterSlave", true);
+
+ final ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
+ MockParticipantManager[] participants = new MockParticipantManager[n];
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName,
instanceName);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new
ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Routing provider is a spectator in Helix. Currentstate based RP listens
on all the
+ // currentstate changes of all the clusters. They are a source of leaking
of watch in
+ // Zookeeper server.
+ ClusterSpectatorManager rpManager = new ClusterSpectatorManager(ZK_ADDR,
clusterName, "router");
+ rpManager.syncStart();
+ RoutingTableProvider rp = new RoutingTableProvider(rpManager,
PropertyType.CURRENTSTATES);
+
+ Thread.sleep(5000);
+
+ // expire RoutingProvider would create dangling CB
+ LOG.info("expire rp manager session:", rpManager.getSessionId());
+ ZkTestHelper.expireSession(rpManager.getZkClient());
+ LOG.info("rp manager new session:", rpManager.getSessionId());
+
+ Thread.sleep(5000);
+
+ MockParticipantManager participantToExpire = participants[0];
+ String oldSessionId = participantToExpire.getSessionId();
+ PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
+
+ // expire participant session; leaked callback handler used to be not
reset() and be removed from ZkClient
+ LOG.info(
+ "Expire participant: " + participantToExpire.getInstanceName() + ",
session: "
+ + participantToExpire.getSessionId());
+ ZkTestHelper.expireSession(participantToExpire.getZkClient());
+ String newSessionId = participantToExpire.getSessionId();
+ LOG.info(participantToExpire.getInstanceName() + " oldSessionId: " +
oldSessionId
+ + ", newSessionId: " + newSessionId);
+
+ Thread.sleep(5000);
+ Map<String, Set<IZkChildListener>> childListeners =
ZkTestHelper.getZkChildListener(rpManager.getZkClient());
+ for (String path : childListeners.keySet()) {
+ Assert.assertTrue(childListeners.get(path).size() <= 1);
+ }
+
+ Map<String, List<String>> rpWatchPaths =
ZkTestHelper.getZkWatch(rpManager.getZkClient());
+ List<String> existWatches = rpWatchPaths.get("existWatches");
+ Assert.assertTrue(existWatches.isEmpty());
+ }
+
+ @Test
+ public void testCurrentStatePathLeakingByAsycRemoval() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 3;
+ final String zkAddr = ZK_ADDR;
+ final int mJobUpdateCnt = 500;
+
+ System.out.println("START " + clusterName + " at " + new
Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB",
1, // resource
+ 32, // partitions
+ n, // nodes
+ 2, // replicas
+ "MasterSlave", true);
+
+ final ClusterControllerManager controller =
+ new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+ controller.syncStart();
+
+ MockParticipantManager[] participants = new MockParticipantManager[n];
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ participants[i] = new MockParticipantManager(zkAddr, clusterName,
instanceName);
+ participants[i].syncStart();
+ }
+
+ Boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // HelixManager rpManager =
HelixManagerFactory.getZKHelixManager(clusterName, "", InstanceType.SPECTATOR,
ZK_ADDR);
+ // rpManager.connect();
+ ClusterSpectatorManager rpManager = new ClusterSpectatorManager(ZK_ADDR,
clusterName, "router");
Review comment:
Still a good idea to use helix style to format the code
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -555,7 +569,11 @@ private void subscribeDataChange(String path,
NotificationContext.Type callbackT
logger.debug(_manager.getInstanceName() + " subscribe data-change.
path: " + path
+ ", listener: " + _listener);
}
- _zkClient.subscribeDataChanges(path, this);
+ boolean rt = _zkClient.subscribeDataChanges(path, this, callbackType !=
Type.INIT);
+ logger.debug("CallbackHandler {} subscribe data path {} result {}",
this, path, rt);
+ if (!rt) {
+ logger.info("CallbackHandler {} subscribe data path {} failed!", this,
path);
Review comment:
I am wondering if info or warn level for this message. If it may
potentially cause problems maybe warn is expected. But if just regular message,
info is good enough.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]