jiajunwang commented on a change in pull request #1000:
URL: https://github.com/apache/helix/pull/1000#discussion_r439015377



##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
##########
@@ -405,6 +413,11 @@ void checkConnected(long timeout) {
 
   void addListener(Object listener, PropertyKey propertyKey, ChangeType 
changeType,
       EventType[] eventType) {
+    addListener(listener, propertyKey, changeType, eventType, -1);
+  }
+
+  void addListener(Object listener, PropertyKey propertyKey, ChangeType 
changeType,

Review comment:
       private?

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -200,6 +206,45 @@ protected void handleEvent(NotificationContext event) {
         logger.warn("Exception in callback processing thread. Skipping 
callback", e);
       }
     }
+
+    public boolean queueIsEmpty() {
+      return _eventQueue.isEmpty();
+    }
+  }
+
+  /**
+   * This class represents each periodic refresh task
+   * If the deadline passed, enqueue an event to do a refresh
+   * If the deadline has not passed (some events happened in between so don't 
need to do a refresh),
+   * wait until reaching the interval and check again
+   */
+  class TriggerTask implements Runnable {
+    @Override
+    public void run() {
+      try {
+        while (true) {
+          long currentTime = System.currentTimeMillis();
+          long remainingTime = _lastInvokeTime + _periodicTriggerInterval - 
currentTime;
+          if (remainingTime <= 0) {
+            // If there is no event in the queue, meaning this long idle time 
could be due to lack of events
+            // Otherwise, if there is event in the queue, this long idle time 
is due to slow process of events
+            if (_batchCallbackProcessor == null || 
_batchCallbackProcessor.queueIsEmpty()) {

Review comment:
       If _batchCallbackProcessor is not empty, but the callback handling takes 
a while. then this loop will do a busy loop, right?
   More graceful design is to wait until the queue becomes non-empty then loop 
to check the remaining time again.

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -818,4 +883,38 @@ public String getContent() {
         + ", _listener=" + _listener + ", _changeType=" + _changeType + ", 
_manager=" + _manager
         + ", _zkClient=" + _zkClient + '}';
   }
+
+  /**
+   * Used to shut down a periodic triggered task
+   */
+  private void shutDownTriggerTask(boolean isShutdown) {
+    if (_scheduledTriggerFuture != null) {
+      _scheduledTriggerFuture.cancel(true);
+    }
+    if (_periodicTriggerExecutor != null && isShutdown) {
+      _periodicTriggerExecutor.shutdownNow();
+      _periodicTriggerExecutor = null;
+    }
+  }
+
+  /**
+   * Used to initialize a periodic triggered task
+   * Schedule tasks in a task executor with fixed intervals
+   */
+  private void initTriggerTask() {

Review comment:
       nit, take _periodicTriggerInterval as the parameter, and don't refer to 
the global private field. This is a good style in which you can reduce the 
impact of a private field in the class. So less dependency if you want to make 
any further change.

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -403,6 +462,10 @@ public void invoke(NotificationContext changeContext) 
throws Exception {
         subscribeForChangesAsyn(changeContext.getType(), _path, _watchChild);
       }
 
+      if (_periodicTriggerExecutor != null) {

Review comment:
       Can we put this in the initTriggerTask() method?

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -818,4 +883,38 @@ public String getContent() {
         + ", _listener=" + _listener + ", _changeType=" + _changeType + ", 
_manager=" + _manager
         + ", _zkClient=" + _zkClient + '}';
   }
+
+  /**
+   * Used to shut down a periodic triggered task
+   */
+  private void shutDownTriggerTask(boolean isShutdown) {
+    if (_scheduledTriggerFuture != null) {
+      _scheduledTriggerFuture.cancel(true);
+    }
+    if (_periodicTriggerExecutor != null && isShutdown) {
+      _periodicTriggerExecutor.shutdownNow();
+      _periodicTriggerExecutor = null;
+    }
+  }
+
+  /**
+   * Used to initialize a periodic triggered task
+   * Schedule tasks in a task executor with fixed intervals
+   */
+  private void initTriggerTask() {
+    if (_periodicTriggerInterval <= 0) {
+      return;
+    }
+    shutDownTriggerTask(false);
+    _lastInvokeTime = System.currentTimeMillis();
+    if (_periodicTriggerExecutor == null) {
+      _periodicTriggerExecutor = new ScheduledThreadPoolExecutor(1);
+      // When cancelling the task future, it removes the task from the queue
+      // so we won't have a memory leakage when we cancel scheduled task
+      _periodicTriggerExecutor.setRemoveOnCancelPolicy(true);
+    }
+    _scheduledTriggerFuture = _periodicTriggerExecutor
+        .scheduleWithFixedDelay(new TriggerTask(), _periodicTriggerInterval,

Review comment:
       I overlooked this part before. Why do you need to use 
scheduleWithFixedDelay() here? The TriggerTask object will keep running in the 
thread, right?

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -200,6 +206,45 @@ protected void handleEvent(NotificationContext event) {
         logger.warn("Exception in callback processing thread. Skipping 
callback", e);
       }
     }
+
+    public boolean queueIsEmpty() {
+      return _eventQueue.isEmpty();
+    }
+  }
+
+  /**
+   * This class represents each periodic refresh task
+   * If the deadline passed, enqueue an event to do a refresh
+   * If the deadline has not passed (some events happened in between so don't 
need to do a refresh),
+   * wait until reaching the interval and check again
+   */
+  class TriggerTask implements Runnable {
+    @Override
+    public void run() {
+      try {
+        while (true) {
+          long currentTime = System.currentTimeMillis();
+          long remainingTime = _lastInvokeTime + _periodicTriggerInterval - 
currentTime;

Review comment:
       To reduce the usage of _periodicTriggerInterval in the whole callback 
class, could you please pass the interval as a constructor parameter? That 
would be a cleaner design.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to