jiajunwang commented on a change in pull request #1000:
URL: https://github.com/apache/helix/pull/1000#discussion_r434988746



##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -210,12 +249,25 @@ protected void handleEvent(NotificationContext event) {
 
   public CallbackHandler(HelixManager manager, RealmAwareZkClient client, 
PropertyKey propertyKey,
       Object listener, EventType[] eventTypes, ChangeType changeType) {
-    this(manager, client, propertyKey, listener, eventTypes, changeType, null);
+    this(manager, client, propertyKey, listener, eventTypes, changeType, null, 
-1);
+  }
+
+  public CallbackHandler(HelixManager manager, RealmAwareZkClient client, 
PropertyKey propertyKey,

Review comment:
       You don't need this one I guess.
   Just add the following one,
   ```
   public CallbackHandler(HelixManager manager, RealmAwareZkClient client, 
PropertyKey propertyKey,
         Object listener, EventType[] eventTypes, ChangeType changeType, 
HelixCallbackMonitor monitor,
         long periodicRefreshTriggerInterval)
   ```

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -693,15 +758,15 @@ public void handleDataChange(String dataPath, Object 
data) {
     try {
       updateNotificationTime(System.nanoTime());
       if (dataPath != null && dataPath.startsWith(_path)) {
-        NotificationContext changeContext = new NotificationContext(_manager);
-        changeContext.setType(NotificationContext.Type.CALLBACK);
-        changeContext.setPathChanged(dataPath);
-        changeContext.setChangeType(_changeType);
-        enqueueTask(changeContext);
-      }
-    } catch (Exception e) {
-      String msg =
-          "exception in handling data-change. path: " + dataPath + ", 
listener: " + _listener;
+      NotificationContext changeContext = new NotificationContext(_manager);

Review comment:
       I go back and check your PR description. You did click the "Code 
Quality" bullet. Please ensure that when you click it, you really did so.

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
##########
@@ -405,6 +413,11 @@ void checkConnected(long timeout) {
 
   void addListener(Object listener, PropertyKey propertyKey, ChangeType 
changeType,
       EventType[] eventType) {
+    addListener(listener, propertyKey, changeType, eventType, -1);
+  }
+
+  void addListener(Object listener, PropertyKey propertyKey, ChangeType 
changeType,
+      EventType[] eventType, long periodicRefreshTriggerInterval) {

Review comment:
       Refresh and Trigger are duplicates somehow.
   I think we just call it periodicTriggerInterval for generic.

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
##########
@@ -548,14 +561,14 @@ public void 
addConfigChangeListener(org.apache.helix.ScopedConfigChangeListener
   @Override
   public void addMessageListener(MessageListener listener, String 
instanceName) {
     addListener(listener, new Builder(_clusterName).messages(instanceName), 
ChangeType.MESSAGE,
-        new EventType[] { EventType.NodeChildrenChanged });
+        new EventType[] { EventType.NodeChildrenChanged }, 
_messageRefreshTriggerInterval);
   }
 
   @Deprecated
   @Override
   public void addMessageListener(org.apache.helix.MessageListener listener, 
String instanceName) {
     addListener(listener, new Builder(_clusterName).messages(instanceName), 
ChangeType.MESSAGE,
-        new EventType[] { EventType.NodeChildrenChanged });
+        new EventType[] { EventType.NodeChildrenChanged }, 
_messageRefreshTriggerInterval);

Review comment:
       optional, but I guess we don't need to add support for the deprecated 
methods.
   
   And one question, why not just casting the deprecated listener class to the 
newer one? So it becomes maintenance-free.

##########
File path: helix-core/src/test/java/org/apache/helix/TestPeriodicRefresh.java
##########
@@ -0,0 +1,232 @@
+package org.apache.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.listeners.MessageListener;
+import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.Message;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.apache.helix.HelixConstants.ChangeType.MESSAGE;
+
+
+/**
+ * This class tests that if there are no incoming events, the onMessage method 
in message listener will be called by message periodic refresh
+ */
+public class TestPeriodicRefresh extends ZkUnitTestBase {

Review comment:
       @kaisun2000 since you raise this concern, please review this test case 
and verify if this is enough or not : )

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -200,6 +208,37 @@ protected void handleEvent(NotificationContext event) {
         logger.warn("Exception in callback processing thread. Skipping 
callback", e);
       }
     }
+
+    public boolean queueIsEmpty() {
+      return _eventQueue.isEmpty();
+    }
+  }
+
+  class RefreshTask implements Runnable {
+    @Override
+    public void run() {
+      try {
+        while (true) {
+          long currentTime = System.currentTimeMillis();
+          long remainingTime = _lastEventTime + 
_periodicRefreshTriggerInterval - currentTime;
+          if (remainingTime <= 0) {
+            // If there is no event in the queue, meaning this long idle time 
could be due to lack of events
+            // Otherwise, if there is event in the queue, this long idle time 
is due to slow process of events
+            if (_batchCallbackProcessor == null || 
_batchCallbackProcessor.queueIsEmpty()) {
+              NotificationContext changeContext = new 
NotificationContext(_manager);
+              changeContext.setType(Type.CALLBACK);
+              changeContext.setChangeType(_changeType);
+              enqueueTask(changeContext);
+            }
+            break;
+          } else {
+            Thread.sleep(remainingTime);

Review comment:
       I know that you are not holding any lock here, but I would prefer using 
wait(remainingTime)

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -131,6 +135,10 @@
   private boolean _batchModeEnabled = false;
   private boolean _preFetchEnabled = true;
   private HelixCallbackMonitor _monitor;
+  private final long _periodicRefreshTriggerInterval;

Review comment:
       I think I mentioned in some earlier comments that Refresh is not general 
enough for the CallbackHandler. Let's just call it _periodicTriggerInterval

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
##########
@@ -171,6 +171,10 @@
   private CallbackHandler _leaderElectionHandler = null;
   protected final List<HelixTimerTask> _controllerTimerTasks = new 
ArrayList<>();
 
+  /** Set this variable to negative can disable the message periodic refresh 
feature */
+  private long _messageRefreshTriggerInterval;

Review comment:
       final?

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -200,6 +208,37 @@ protected void handleEvent(NotificationContext event) {
         logger.warn("Exception in callback processing thread. Skipping 
callback", e);
       }
     }
+
+    public boolean queueIsEmpty() {
+      return _eventQueue.isEmpty();
+    }
+  }
+
+  class RefreshTask implements Runnable {
+    @Override
+    public void run() {
+      try {
+        while (true) {
+          long currentTime = System.currentTimeMillis();
+          long remainingTime = _lastEventTime + 
_periodicRefreshTriggerInterval - currentTime;
+          if (remainingTime <= 0) {
+            // If there is no event in the queue, meaning this long idle time 
could be due to lack of events
+            // Otherwise, if there is event in the queue, this long idle time 
is due to slow process of events
+            if (_batchCallbackProcessor == null || 
_batchCallbackProcessor.queueIsEmpty()) {
+              NotificationContext changeContext = new 
NotificationContext(_manager);
+              changeContext.setType(Type.CALLBACK);
+              changeContext.setChangeType(_changeType);
+              enqueueTask(changeContext);

Review comment:
       Still the same question, if batch mode is false, this will be running in 
parallel with the ZKEvent thread, right? How to avoid conflict?

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
##########
@@ -693,15 +758,15 @@ public void handleDataChange(String dataPath, Object 
data) {
     try {
       updateNotificationTime(System.nanoTime());
       if (dataPath != null && dataPath.startsWith(_path)) {
-        NotificationContext changeContext = new NotificationContext(_manager);
-        changeContext.setType(NotificationContext.Type.CALLBACK);
-        changeContext.setPathChanged(dataPath);
-        changeContext.setChangeType(_changeType);
-        enqueueTask(changeContext);
-      }
-    } catch (Exception e) {
-      String msg =
-          "exception in handling data-change. path: " + dataPath + ", 
listener: " + _listener;
+      NotificationContext changeContext = new NotificationContext(_manager);

Review comment:
       format




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to