jiajunwang commented on a change in pull request #1000:
URL: https://github.com/apache/helix/pull/1000#discussion_r437756192



##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -200,6 +206,43 @@ protected void handleEvent(NotificationContext event) {
         logger.warn("Exception in callback processing thread. Skipping 
callback", e);
       }
     }
+
+    public boolean queueIsEmpty() {
+      return _eventQueue.isEmpty();
+    }
+  }
+
+  /**
+   * This class represents each periodic refresh task
+   * If the deadline passed, enqueue an event to do a refresh
+   * If the deadline has not passed (some events happened in between so don't 
need to do a refresh),
+   * wait until reaching the interval and check again
+   */
+  class RefreshTask implements Runnable {

Review comment:
       TriggerTask?

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -403,6 +463,10 @@ public void invoke(NotificationContext changeContext) 
throws Exception {
         subscribeForChangesAsyn(changeContext.getType(), _path, _watchChild);
       }
 
+      if (_periodicTriggerInterval > 0) {

Review comment:
       This is not the direct indicator, how about check if 
_periodicRefreshExecutor != null?

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -131,6 +133,10 @@
   private boolean _batchModeEnabled = false;
   private boolean _preFetchEnabled = true;
   private HelixCallbackMonitor _monitor;
+  private final long _periodicTriggerInterval;
+  private ScheduledThreadPoolExecutor _periodicRefreshExecutor;

Review comment:
       _periodicTriggerExecutor

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -131,6 +133,10 @@
   private boolean _batchModeEnabled = false;
   private boolean _preFetchEnabled = true;
   private HelixCallbackMonitor _monitor;
+  private final long _periodicTriggerInterval;
+  private ScheduledThreadPoolExecutor _periodicRefreshExecutor;
+  private ScheduledFuture<?> _scheduledRefreshFuture;

Review comment:
       _scheduledTriggerFuture

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
##########
@@ -171,6 +171,10 @@
   private CallbackHandler _leaderElectionHandler = null;
   protected final List<HelixTimerTask> _controllerTimerTasks = new 
ArrayList<>();
 
+  /** Set this variable to negative can disable the message periodic refresh 
feature */
+  private final long _messageRefreshTriggerInterval;
+  private final long DEFAULT_PERIODIC_MESSAGE_REFRESH_INTERVAL = 2 * 60 * 
1000; //2 minutes

Review comment:
       static

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -781,8 +845,15 @@ void reset(boolean isShutdown) {
           if (isShutdown) {
             _batchCallbackProcessor.shutdown();
             _batchCallbackProcessor = null;
+            if (_periodicRefreshExecutor != null) {
+              _periodicRefreshExecutor.shutdownNow();
+            }
           } else {
             _batchCallbackProcessor.resetEventQueue();
+            if (_scheduledRefreshFuture != null) {
+              _periodicRefreshExecutor.shutdownNow();
+              initRefreshTask();

Review comment:
       The next event would be FINALIZE, so even the periodic task is 
triggered, it won't really invoke the handling logic successfully. I think we 
shall not to init here.
   
   Shall we do it in the init() call?
   
    

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -245,6 +294,17 @@ public CallbackHandler(HelixManager manager, 
RealmAwareZkClient client, Property
     parseListenerProperties();
 
     init();
+
+    /**
+     * Async batch read was designed that if an exception is thrown, the 
operation fails and returns a partial result.
+     * The issue is separately addressed by checking read result in batch read 
calls [https://github.com/apache/helix/pull/974]
+     * and retry batch read for ZK connectivity issues 
[https://github.com/apache/helix/pull/970].
+     * We also add this one to periodical read stale messages from ZkServer in 
the case we don't see any event for a certain period.
+     */
+    _periodicTriggerInterval = periodicTriggerInterval;
+    if (_periodicTriggerInterval > 0) {
+      initRefreshTask();

Review comment:
       nit, pass the periodicTriggerInterval to initRefreshTask() and do the 
branch inside. That makes this code cleaner.

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -245,6 +294,17 @@ public CallbackHandler(HelixManager manager, 
RealmAwareZkClient client, Property
     parseListenerProperties();
 
     init();
+
+    /**
+     * Async batch read was designed that if an exception is thrown, the 
operation fails and returns a partial result.
+     * The issue is separately addressed by checking read result in batch read 
calls [https://github.com/apache/helix/pull/974]
+     * and retry batch read for ZK connectivity issues 
[https://github.com/apache/helix/pull/970].
+     * We also add this one to periodical read stale messages from ZkServer in 
the case we don't see any event for a certain period.
+     */
+    _periodicTriggerInterval = periodicTriggerInterval;
+    if (_periodicTriggerInterval > 0) {
+      initRefreshTask();

Review comment:
       initTriggerTask()?

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -200,6 +206,43 @@ protected void handleEvent(NotificationContext event) {
         logger.warn("Exception in callback processing thread. Skipping 
callback", e);
       }
     }
+
+    public boolean queueIsEmpty() {
+      return _eventQueue.isEmpty();
+    }
+  }
+
+  /**
+   * This class represents each periodic refresh task
+   * If the deadline passed, enqueue an event to do a refresh
+   * If the deadline has not passed (some events happened in between so don't 
need to do a refresh),
+   * wait until reaching the interval and check again
+   */
+  class RefreshTask implements Runnable {
+    @Override
+    public void run() {
+      try {
+        while (true) {
+          long currentTime = System.currentTimeMillis();
+          long remainingTime = _lastEventTime + _periodicTriggerInterval - 
currentTime;
+          if (remainingTime <= 0) {
+            // If there is no event in the queue, meaning this long idle time 
could be due to lack of events
+            // Otherwise, if there is event in the queue, this long idle time 
is due to slow process of events
+            if (_batchCallbackProcessor == null || 
_batchCallbackProcessor.queueIsEmpty()) {
+              NotificationContext changeContext = new 
NotificationContext(_manager);
+              changeContext.setType(Type.CALLBACK);
+              changeContext.setChangeType(_changeType);
+              enqueueTask(changeContext);
+            }
+            break;
+          } else {
+            wait(remainingTime);
+          }
+        }
+      } catch (Exception e) {

Review comment:
       For the interruption exception, we may want to exit the execution.

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -131,6 +133,10 @@
   private boolean _batchModeEnabled = false;
   private boolean _preFetchEnabled = true;
   private HelixCallbackMonitor _monitor;
+  private final long _periodicTriggerInterval;
+  private ScheduledThreadPoolExecutor _periodicRefreshExecutor;
+  private ScheduledFuture<?> _scheduledRefreshFuture;
+  private volatile long _lastEventTime;

Review comment:
       Should be lastInvokeTime?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to