jiajunwang commented on a change in pull request #1000:
URL: https://github.com/apache/helix/pull/1000#discussion_r437756192
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -200,6 +206,43 @@ protected void handleEvent(NotificationContext event) {
logger.warn("Exception in callback processing thread. Skipping
callback", e);
}
}
+
+ public boolean queueIsEmpty() {
+ return _eventQueue.isEmpty();
+ }
+ }
+
+ /**
+ * This class represents each periodic refresh task
+ * If the deadline passed, enqueue an event to do a refresh
+ * If the deadline has not passed (some events happened in between so don't
need to do a refresh),
+ * wait until reaching the interval and check again
+ */
+ class RefreshTask implements Runnable {
Review comment:
TriggerTask?
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -403,6 +463,10 @@ public void invoke(NotificationContext changeContext)
throws Exception {
subscribeForChangesAsyn(changeContext.getType(), _path, _watchChild);
}
+ if (_periodicTriggerInterval > 0) {
Review comment:
This is not the direct indicator, how about check if
_periodicRefreshExecutor != null?
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -131,6 +133,10 @@
private boolean _batchModeEnabled = false;
private boolean _preFetchEnabled = true;
private HelixCallbackMonitor _monitor;
+ private final long _periodicTriggerInterval;
+ private ScheduledThreadPoolExecutor _periodicRefreshExecutor;
Review comment:
_periodicTriggerExecutor
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -131,6 +133,10 @@
private boolean _batchModeEnabled = false;
private boolean _preFetchEnabled = true;
private HelixCallbackMonitor _monitor;
+ private final long _periodicTriggerInterval;
+ private ScheduledThreadPoolExecutor _periodicRefreshExecutor;
+ private ScheduledFuture<?> _scheduledRefreshFuture;
Review comment:
_scheduledTriggerFuture
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
##########
@@ -171,6 +171,10 @@
private CallbackHandler _leaderElectionHandler = null;
protected final List<HelixTimerTask> _controllerTimerTasks = new
ArrayList<>();
+ /** Set this variable to negative can disable the message periodic refresh
feature */
+ private final long _messageRefreshTriggerInterval;
+ private final long DEFAULT_PERIODIC_MESSAGE_REFRESH_INTERVAL = 2 * 60 *
1000; //2 minutes
Review comment:
static
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -781,8 +845,15 @@ void reset(boolean isShutdown) {
if (isShutdown) {
_batchCallbackProcessor.shutdown();
_batchCallbackProcessor = null;
+ if (_periodicRefreshExecutor != null) {
+ _periodicRefreshExecutor.shutdownNow();
+ }
} else {
_batchCallbackProcessor.resetEventQueue();
+ if (_scheduledRefreshFuture != null) {
+ _periodicRefreshExecutor.shutdownNow();
+ initRefreshTask();
Review comment:
The next event would be FINALIZE, so even the periodic task is
triggered, it won't really invoke the handling logic successfully. I think we
shall not to init here.
Shall we do it in the init() call?
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -245,6 +294,17 @@ public CallbackHandler(HelixManager manager,
RealmAwareZkClient client, Property
parseListenerProperties();
init();
+
+ /**
+ * Async batch read was designed that if an exception is thrown, the
operation fails and returns a partial result.
+ * The issue is separately addressed by checking read result in batch read
calls [https://github.com/apache/helix/pull/974]
+ * and retry batch read for ZK connectivity issues
[https://github.com/apache/helix/pull/970].
+ * We also add this one to periodical read stale messages from ZkServer in
the case we don't see any event for a certain period.
+ */
+ _periodicTriggerInterval = periodicTriggerInterval;
+ if (_periodicTriggerInterval > 0) {
+ initRefreshTask();
Review comment:
nit, pass the periodicTriggerInterval to initRefreshTask() and do the
branch inside. That makes this code cleaner.
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -245,6 +294,17 @@ public CallbackHandler(HelixManager manager,
RealmAwareZkClient client, Property
parseListenerProperties();
init();
+
+ /**
+ * Async batch read was designed that if an exception is thrown, the
operation fails and returns a partial result.
+ * The issue is separately addressed by checking read result in batch read
calls [https://github.com/apache/helix/pull/974]
+ * and retry batch read for ZK connectivity issues
[https://github.com/apache/helix/pull/970].
+ * We also add this one to periodical read stale messages from ZkServer in
the case we don't see any event for a certain period.
+ */
+ _periodicTriggerInterval = periodicTriggerInterval;
+ if (_periodicTriggerInterval > 0) {
+ initRefreshTask();
Review comment:
initTriggerTask()?
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -200,6 +206,43 @@ protected void handleEvent(NotificationContext event) {
logger.warn("Exception in callback processing thread. Skipping
callback", e);
}
}
+
+ public boolean queueIsEmpty() {
+ return _eventQueue.isEmpty();
+ }
+ }
+
+ /**
+ * This class represents each periodic refresh task
+ * If the deadline passed, enqueue an event to do a refresh
+ * If the deadline has not passed (some events happened in between so don't
need to do a refresh),
+ * wait until reaching the interval and check again
+ */
+ class RefreshTask implements Runnable {
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ long currentTime = System.currentTimeMillis();
+ long remainingTime = _lastEventTime + _periodicTriggerInterval -
currentTime;
+ if (remainingTime <= 0) {
+ // If there is no event in the queue, meaning this long idle time
could be due to lack of events
+ // Otherwise, if there is event in the queue, this long idle time
is due to slow process of events
+ if (_batchCallbackProcessor == null ||
_batchCallbackProcessor.queueIsEmpty()) {
+ NotificationContext changeContext = new
NotificationContext(_manager);
+ changeContext.setType(Type.CALLBACK);
+ changeContext.setChangeType(_changeType);
+ enqueueTask(changeContext);
+ }
+ break;
+ } else {
+ wait(remainingTime);
+ }
+ }
+ } catch (Exception e) {
Review comment:
For the interruption exception, we may want to exit the execution.
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -131,6 +133,10 @@
private boolean _batchModeEnabled = false;
private boolean _preFetchEnabled = true;
private HelixCallbackMonitor _monitor;
+ private final long _periodicTriggerInterval;
+ private ScheduledThreadPoolExecutor _periodicRefreshExecutor;
+ private ScheduledFuture<?> _scheduledRefreshFuture;
+ private volatile long _lastEventTime;
Review comment:
Should be lastInvokeTime?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]