dasahcc commented on a change in pull request #1149:
URL: https://github.com/apache/helix/pull/1149#discussion_r455983948
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
##########
@@ -866,6 +858,83 @@ public void disconnect() {
}
}
+ /**
+ * The callback handler cleanup operations that require an active ZkClient
connection.
+ * If ZkClient is not connected, Helix Manager shall skip the cleanup.
+ *
+ * @return true if the cleanup has been done successfully.
+ */
+ private boolean cleanupCallbackHandlers() {
+ AtomicBoolean cleanupDone = new AtomicBoolean(false);
+
+ if (_zkclient.waitUntilConnected(_waitForConnectedTimeout,
TimeUnit.MILLISECONDS)) {
+ // Create a separate thread for executing cleanup task to avoid forever
retry.
+ Thread cleanupThread = new Thread(String
+ .format("Cleanup thread for %s-%s-%s", _clusterName, _instanceName,
_instanceType)) {
+ @Override
+ public void run() {
+ // TODO reset user defined handlers only
+ resetHandlers(true);
+
+ if (_leaderElectionHandler != null) {
+ _leaderElectionHandler.reset(true);
+ }
+
+ ParticipantManager participantManager = _participantManager;
+ if (participantManager != null) {
+ participantManager.disconnect();
+ }
+
+ cleanupDone.set(true);
+ }
+ };
+
+ // Define the state listener to terminate the cleanup thread when the
ZkConnection breaks.
+ IZkStateListener stateListener = new IZkStateListener() {
+ @Override
+ public void handleStateChanged(KeeperState state) {
+ // If the connection breaks during the cleanup , then stop the
cleanup thread.
+ if (state != KeeperState.SyncConnected) {
+ cleanupThread.interrupt();
+ }
+ }
+
+ @Override
+ public void handleNewSession(String sessionId) {
+ // nothing
+ }
+
+ @Override
+ public void handleSessionEstablishmentError(Throwable error) {
+ // nothing
+ }
+ };
+
+ cleanupThread.start();
+ try {
+ // Subscribe and check the connection status one more time to ensure
the thread is running
+ // with an active ZkConnection.
+ _zkclient.subscribeStateChanges(stateListener);
+ if (!_zkclient.waitUntilConnected(0, TimeUnit.MILLISECONDS)) {
+ cleanupThread.interrupt();
+ }
+
+ try {
+ cleanupThread.join();
+ } catch (InterruptedException ex) {
+ cleanupThread.interrupt();
+ }
+ } finally {
+ _zkclient.unsubscribeStateChanges(stateListener);
+ }
+ } else {
+ LOG.warn(
+ "ZkClient is not connected to the Zookeeper. Skip the cleanup work
that requires accessing Zookeeper.");
+ }
+
+ return cleanupDone.get();
Review comment:
Will the call be hanging here as well? If there is no connection,
cleanupThread got interrupted, but no one set this value to be true.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]