mgao0 commented on a change in pull request #1000:
URL: https://github.com/apache/helix/pull/1000#discussion_r435390675
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -200,6 +208,37 @@ protected void handleEvent(NotificationContext event) {
logger.warn("Exception in callback processing thread. Skipping
callback", e);
}
}
+
+ public boolean queueIsEmpty() {
+ return _eventQueue.isEmpty();
+ }
+ }
+
+ class RefreshTask implements Runnable {
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ long currentTime = System.currentTimeMillis();
+ long remainingTime = _lastEventTime +
_periodicRefreshTriggerInterval - currentTime;
+ if (remainingTime <= 0) {
+ // If there is no event in the queue, meaning this long idle time
could be due to lack of events
+ // Otherwise, if there is event in the queue, this long idle time
is due to slow process of events
+ if (_batchCallbackProcessor == null ||
_batchCallbackProcessor.queueIsEmpty()) {
+ NotificationContext changeContext = new
NotificationContext(_manager);
+ changeContext.setType(Type.CALLBACK);
+ changeContext.setChangeType(_changeType);
+ enqueueTask(changeContext);
Review comment:
It will call invoke method, and we synchronize Helix manager inside the
invoke method, so there shouldn't be a problem.
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -693,15 +758,15 @@ public void handleDataChange(String dataPath, Object
data) {
try {
updateNotificationTime(System.nanoTime());
if (dataPath != null && dataPath.startsWith(_path)) {
- NotificationContext changeContext = new NotificationContext(_manager);
- changeContext.setType(NotificationContext.Type.CALLBACK);
- changeContext.setPathChanged(dataPath);
- changeContext.setChangeType(_changeType);
- enqueueTask(changeContext);
- }
- } catch (Exception e) {
- String msg =
- "exception in handling data-change. path: " + dataPath + ",
listener: " + _listener;
+ NotificationContext changeContext = new NotificationContext(_manager);
Review comment:
Sorry didn't mean to change this code. Updated.
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -131,6 +135,10 @@
private boolean _batchModeEnabled = false;
private boolean _preFetchEnabled = true;
private HelixCallbackMonitor _monitor;
+ private final long _periodicRefreshTriggerInterval;
Review comment:
Didn't get what you meant before. Changed it.
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
##########
@@ -548,14 +561,14 @@ public void
addConfigChangeListener(org.apache.helix.ScopedConfigChangeListener
@Override
public void addMessageListener(MessageListener listener, String
instanceName) {
addListener(listener, new Builder(_clusterName).messages(instanceName),
ChangeType.MESSAGE,
- new EventType[] { EventType.NodeChildrenChanged });
+ new EventType[] { EventType.NodeChildrenChanged },
_messageRefreshTriggerInterval);
}
@Deprecated
@Override
public void addMessageListener(org.apache.helix.MessageListener listener,
String instanceName) {
addListener(listener, new Builder(_clusterName).messages(instanceName),
ChangeType.MESSAGE,
- new EventType[] { EventType.NodeChildrenChanged });
+ new EventType[] { EventType.NodeChildrenChanged },
_messageRefreshTriggerInterval);
Review comment:
Good idea. This is better way to do for the deprecated listener. Updated
it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]