narendly commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r422451164



##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
##########
@@ -109,7 +109,11 @@
     // 
https://github.com/apache/helix/wiki/Weight-aware-Globally-Evenly-distributed-Rebalancer#rebalance-coordinator
     //
     // Default to be true.
-    GLOBAL_REBALANCE_ASYNC_MODE
+    GLOBAL_REBALANCE_ASYNC_MODE,
+
+    // The target size of task thread pools for each participant. This is the 
global value
+    // that's used when participants don't specify their individual pool sizes.

Review comment:
       Let's clarify which config takes precedence and also what would it mean 
to not have this field defined.
   
   For example, consider the following:
   1. what if an instance has a target value set in InstanceConfig and there is 
a global thread pool size config set as well?
   2. what if there's nothing set at all? which value will this use?
   etc..

##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
##########
@@ -709,6 +714,33 @@ public void setInstanceCapacityKeys(List<String> 
capacityKeys) {
     _record.setListField(ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(), 
capacityKeys);
   }
 
+  /**
+   * Get the global target size of task thread pools. This values applies to 
participants and is
+   * overwritten by participants' own values if they specified individual pool 
sizes in
+   * InstanceConfigs

Review comment:
       What if this value is not set?

##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
##########
@@ -709,6 +714,33 @@ public void setInstanceCapacityKeys(List<String> 
capacityKeys) {
     _record.setListField(ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(), 
capacityKeys);
   }
 
+  /**
+   * Get the global target size of task thread pools. This values applies to 
participants and is
+   * overwritten by participants' own values if they specified individual pool 
sizes in
+   * InstanceConfigs
+   * @return the global target size of task thread pool
+   */
+  public int getGlobalTargetTaskThreadPoolSize() {
+    return _record
+        
.getIntField(ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(),
+            GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET);
+  }
+
+  /**
+   * Set the global target size of task thread pools for this cluster.
+   * @param globalTargetTaskThreadPoolSize - the new global target task thread 
pool size
+   * @throws IllegalArgumentException - when the provided new thread pool size 
is not greater than 0
+   */
+  public void setGlobalTargetTaskThreadPoolSize(int 
globalTargetTaskThreadPoolSize)
+      throws IllegalArgumentException {
+    if (globalTargetTaskThreadPoolSize <= 0) {
+      throw new IllegalArgumentException("globalTargetTaskThreadPoolSize must 
be greater than 0!");
+    }

Review comment:
       Technically, I think the value 0 is also valid. It just wouldn't process 
any tasks. What do you think?
   
   I think that we might be able to use this config as a workaround to 
"disable" task framework globally, or only have certain participants process 
tasks - say, global is 0, but certain participants will have their individual 
configs set.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
##########
@@ -49,4 +49,11 @@
   public static final String PREV_RA_NODE = "PreviousResourceAssignment";
 
   public static final boolean DEFAULT_TASK_ENABLE_COMPRESSION = false;
+
+  /**
+   * The default task thread pool size that will be used to create thread 
pools if target thread
+   * pool sizes are not defined in InstanceConfig or ClusterConfig; also used 
as the current thread
+   * pool size default value if the current thread pool size is not defined in 
LiveInstance

Review comment:
       Good description! :)

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +96,53 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  private ScheduledExecutorService createTaskExecutor(int taskThreadPoolSize) {
+    return Executors.newScheduledThreadPool(taskThreadPoolSize, new 
ThreadFactory() {
+      private AtomicInteger threadId = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "TaskStateModelFactory-task_thread-" + 
threadId.getAndIncrement());
+      }
+    });
+  }
+
+  private ScheduledExecutorService createTimerTaskExecutor() {
+    return Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "TaskStateModelFactory-timeTask_thread");
+      }
+    });
+  }
+
+  private void initializeTaskMonitor() {
+    if (_taskExecutor instanceof ThreadPoolExecutor) {
+      try {
+        _monitor = new 
ThreadPoolExecutorMonitor(TaskConstants.STATE_MODEL_NAME,
+            (ThreadPoolExecutor) _taskExecutor);
+      } catch (JMException e) {
+        LOG.warn("Error in creating ThreadPoolExecutorMonitor for 
TaskStateModelFactory.");
+      }
+    }
+  }
+
+  /*
+   * Create a config accessor to get the thread pool size
+   */
+  protected ConfigAccessor createConfigAccessor() {
+    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+      String clusterName = _manager.getClusterName();
+      String shardingKey = clusterName.charAt(0) == '/' ? clusterName : "/" + 
clusterName;

Review comment:
       Cluster name doesn't have a "/". So this ternary check is not necessary?

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -44,39 +48,29 @@
   private final ScheduledExecutorService _taskExecutor;
   private final ScheduledExecutorService _timerTaskExecutor;
   private ThreadPoolExecutorMonitor _monitor;
-  public final static int TASK_THREADPOOL_SIZE = 40;
 
   public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> 
taskFactoryRegistry) {
-    this(manager, taskFactoryRegistry,
-        Executors.newScheduledThreadPool(TASK_THREADPOOL_SIZE, new 
ThreadFactory() {
-          private AtomicInteger threadId = new AtomicInteger(0);
-
-          @Override
-          public Thread newThread(Runnable r) {
-            return new Thread(r, "TaskStateModelFactory-task_thread-" + 
threadId.getAndIncrement());
-          }
-        }));
+    _manager = manager;
+    _taskFactoryRegistry = taskFactoryRegistry;
+    // TODO: revisit the logic here; we are creating a connection although we 
already have a manager

Review comment:
       It would be a good idea to give more context:
   
   This is only so because we don't enforce the order in which the manager 
should be connected. Some users register a taskFactoryRegistry before 
connecting the manager, others do so after. Either works. This is only a 
problem for the former case. 
   So the real TODO is to think about what the right order should be and to 
determine whether we should enforce this order (which would make it backward 
incompatible), but arguably cleaner because we then won't have to create an 
extra ZK connection.

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -44,39 +48,29 @@
   private final ScheduledExecutorService _taskExecutor;
   private final ScheduledExecutorService _timerTaskExecutor;
   private ThreadPoolExecutorMonitor _monitor;
-  public final static int TASK_THREADPOOL_SIZE = 40;
 
   public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> 
taskFactoryRegistry) {
-    this(manager, taskFactoryRegistry,
-        Executors.newScheduledThreadPool(TASK_THREADPOOL_SIZE, new 
ThreadFactory() {
-          private AtomicInteger threadId = new AtomicInteger(0);
-
-          @Override
-          public Thread newThread(Runnable r) {
-            return new Thread(r, "TaskStateModelFactory-task_thread-" + 
threadId.getAndIncrement());
-          }
-        }));
+    _manager = manager;
+    _taskFactoryRegistry = taskFactoryRegistry;
+    // TODO: revisit the logic here; we are creating a connection although we 
already have a manager
+    ConfigAccessor configAccessor = createConfigAccessor();
+    int threadPoolSize = TaskUtil.getTargetThreadPoolSize(configAccessor, 
_manager.getClusterName(),
+        _manager.getInstanceName());
+    configAccessor.close();
+    _taskExecutor = createTaskExecutor(threadPoolSize);
+    _timerTaskExecutor = createTimerTaskExecutor();
+    initializeTaskMonitor();
   }
 
+  // FIXME: DO NOT USE! This size of provided thread pool will not be 
reflected to controller
+  // properly, the controller may over schedule tasks to this participant.

Review comment:
       Let's also add that we want to avoid using this because Task Framework 
needs to have full control of the thread pool unlike the state transition 
thread pool.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
##########
@@ -1045,4 +1048,54 @@ private static void setNextJobPurgeTime(String workflow, 
long currentTime, long
       rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
     }
   }
+
+  /**
+   * Get target thread pool size from InstanceConfig first; if that fails, get 
it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   * @param configAccessor - accessor used for the configs
+   * @param clusterName - the cluster name for InstanceConfig and ClusterConfig
+   * @param instanceName - the instance name for InstanceConfig
+   * @return target thread pool size
+   */
+  public static int getTargetThreadPoolSize(ConfigAccessor configAccessor, 
String clusterName,
+      String instanceName) {
+    // Check instance config first for thread pool size
+    try {
+      InstanceConfig instanceConfig = 
configAccessor.getInstanceConfig(clusterName, instanceName);
+      if (instanceConfig != null) {
+        int targetTaskThreadPoolSize = 
instanceConfig.getTargetTaskThreadPoolSize();
+        if (targetTaskThreadPoolSize > 0) {

Review comment:
       Should we allow 0?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
##########
@@ -1045,4 +1048,54 @@ private static void setNextJobPurgeTime(String workflow, 
long currentTime, long
       rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
     }
   }
+
+  /**
+   * Get target thread pool size from InstanceConfig first; if that fails, get 
it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   * @param configAccessor - accessor used for the configs
+   * @param clusterName - the cluster name for InstanceConfig and ClusterConfig
+   * @param instanceName - the instance name for InstanceConfig
+   * @return target thread pool size
+   */
+  public static int getTargetThreadPoolSize(ConfigAccessor configAccessor, 
String clusterName,
+      String instanceName) {
+    // Check instance config first for thread pool size
+    try {
+      InstanceConfig instanceConfig = 
configAccessor.getInstanceConfig(clusterName, instanceName);
+      if (instanceConfig != null) {
+        int targetTaskThreadPoolSize = 
instanceConfig.getTargetTaskThreadPoolSize();
+        if (targetTaskThreadPoolSize > 0) {
+          return targetTaskThreadPoolSize;
+        }
+      } else {
+        LOG.warn(
+            "Got null as InstanceConfig for instance {} in cluster {}. 
Continuing with ClusterConfig. ",
+            instanceName, clusterName);
+      }
+    } catch (HelixException e) {
+      LOG.warn(

Review comment:
       Nit: your last "Exception: " is not necessary since you're just 
providing it as the last parameter. Log4j I think formats it already.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
##########
@@ -1045,4 +1048,54 @@ private static void setNextJobPurgeTime(String workflow, 
long currentTime, long
       rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
     }
   }
+
+  /**
+   * Get target thread pool size from InstanceConfig first; if that fails, get 
it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   * @param configAccessor - accessor used for the configs
+   * @param clusterName - the cluster name for InstanceConfig and ClusterConfig
+   * @param instanceName - the instance name for InstanceConfig
+   * @return target thread pool size
+   */
+  public static int getTargetThreadPoolSize(ConfigAccessor configAccessor, 
String clusterName,
+      String instanceName) {
+    // Check instance config first for thread pool size
+    try {
+      InstanceConfig instanceConfig = 
configAccessor.getInstanceConfig(clusterName, instanceName);
+      if (instanceConfig != null) {
+        int targetTaskThreadPoolSize = 
instanceConfig.getTargetTaskThreadPoolSize();
+        if (targetTaskThreadPoolSize > 0) {
+          return targetTaskThreadPoolSize;
+        }
+      } else {
+        LOG.warn(
+            "Got null as InstanceConfig for instance {} in cluster {}. 
Continuing with ClusterConfig. ",
+            instanceName, clusterName);
+      }
+    } catch (HelixException e) {
+      LOG.warn(
+          "Encountered an exception while fetching InstanceConfig for instance 
{} in cluster {}. Continuing with ClusterConfig. Exception: ",
+          instanceName, clusterName, e);
+    }
+
+    // Fallback to cluster config since instance config doesn't provide the 
value
+    try {
+      ClusterConfig clusterConfig = 
configAccessor.getClusterConfig(clusterName);
+      if (clusterConfig != null) {
+        int globalTargetTaskThreadPoolSize = 
clusterConfig.getGlobalTargetTaskThreadPoolSize();
+        if (globalTargetTaskThreadPoolSize > 0) {

Review comment:
       0?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
##########
@@ -1045,4 +1048,54 @@ private static void setNextJobPurgeTime(String workflow, 
long currentTime, long
       rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
     }
   }
+
+  /**
+   * Get target thread pool size from InstanceConfig first; if that fails, get 
it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   * @param configAccessor - accessor used for the configs
+   * @param clusterName - the cluster name for InstanceConfig and ClusterConfig
+   * @param instanceName - the instance name for InstanceConfig
+   * @return target thread pool size
+   */
+  public static int getTargetThreadPoolSize(ConfigAccessor configAccessor, 
String clusterName,
+      String instanceName) {
+    // Check instance config first for thread pool size
+    try {
+      InstanceConfig instanceConfig = 
configAccessor.getInstanceConfig(clusterName, instanceName);
+      if (instanceConfig != null) {
+        int targetTaskThreadPoolSize = 
instanceConfig.getTargetTaskThreadPoolSize();
+        if (targetTaskThreadPoolSize > 0) {
+          return targetTaskThreadPoolSize;
+        }
+      } else {
+        LOG.warn(
+            "Got null as InstanceConfig for instance {} in cluster {}. 
Continuing with ClusterConfig. ",
+            instanceName, clusterName);
+      }
+    } catch (HelixException e) {
+      LOG.warn(
+          "Encountered an exception while fetching InstanceConfig for instance 
{} in cluster {}. Continuing with ClusterConfig. Exception: ",
+          instanceName, clusterName, e);
+    }
+
+    // Fallback to cluster config since instance config doesn't provide the 
value
+    try {
+      ClusterConfig clusterConfig = 
configAccessor.getClusterConfig(clusterName);
+      if (clusterConfig != null) {
+        int globalTargetTaskThreadPoolSize = 
clusterConfig.getGlobalTargetTaskThreadPoolSize();
+        if (globalTargetTaskThreadPoolSize > 0) {
+          return globalTargetTaskThreadPoolSize;
+        }
+      } else {
+        LOG.warn("Got null as ClusterConfig for cluster {}. Returning default 
value. ",

Review comment:
       It would be nice to include the default size constant here.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
##########
@@ -1045,4 +1048,54 @@ private static void setNextJobPurgeTime(String workflow, 
long currentTime, long
       rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
     }
   }
+
+  /**
+   * Get target thread pool size from InstanceConfig first; if that fails, get 
it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   * @param configAccessor - accessor used for the configs
+   * @param clusterName - the cluster name for InstanceConfig and ClusterConfig
+   * @param instanceName - the instance name for InstanceConfig
+   * @return target thread pool size
+   */
+  public static int getTargetThreadPoolSize(ConfigAccessor configAccessor, 
String clusterName,
+      String instanceName) {
+    // Check instance config first for thread pool size
+    try {
+      InstanceConfig instanceConfig = 
configAccessor.getInstanceConfig(clusterName, instanceName);
+      if (instanceConfig != null) {
+        int targetTaskThreadPoolSize = 
instanceConfig.getTargetTaskThreadPoolSize();
+        if (targetTaskThreadPoolSize > 0) {
+          return targetTaskThreadPoolSize;
+        }
+      } else {
+        LOG.warn(
+            "Got null as InstanceConfig for instance {} in cluster {}. 
Continuing with ClusterConfig. ",
+            instanceName, clusterName);
+      }
+    } catch (HelixException e) {
+      LOG.warn(
+          "Encountered an exception while fetching InstanceConfig for instance 
{} in cluster {}. Continuing with ClusterConfig. Exception: ",
+          instanceName, clusterName, e);
+    }
+
+    // Fallback to cluster config since instance config doesn't provide the 
value
+    try {
+      ClusterConfig clusterConfig = 
configAccessor.getClusterConfig(clusterName);
+      if (clusterConfig != null) {
+        int globalTargetTaskThreadPoolSize = 
clusterConfig.getGlobalTargetTaskThreadPoolSize();
+        if (globalTargetTaskThreadPoolSize > 0) {
+          return globalTargetTaskThreadPoolSize;
+        }
+      } else {
+        LOG.warn("Got null as ClusterConfig for cluster {}. Returning default 
value. ",
+            clusterName);
+      }
+    } catch (HelixException e) {
+      LOG.warn(
+          "Encountered an exception while fetching ClusterConfig in cluster 
{}. Returning default value. Exception: ",

Review comment:
       Say what the default value is, and no need for "Exception: ".

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
##########
@@ -1045,4 +1048,54 @@ private static void setNextJobPurgeTime(String workflow, 
long currentTime, long
       rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
     }
   }
+
+  /**
+   * Get target thread pool size from InstanceConfig first; if that fails, get 
it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   * @param configAccessor - accessor used for the configs
+   * @param clusterName - the cluster name for InstanceConfig and ClusterConfig
+   * @param instanceName - the instance name for InstanceConfig
+   * @return target thread pool size
+   */
+  public static int getTargetThreadPoolSize(ConfigAccessor configAccessor, 
String clusterName,
+      String instanceName) {
+    // Check instance config first for thread pool size
+    try {
+      InstanceConfig instanceConfig = 
configAccessor.getInstanceConfig(clusterName, instanceName);
+      if (instanceConfig != null) {
+        int targetTaskThreadPoolSize = 
instanceConfig.getTargetTaskThreadPoolSize();
+        if (targetTaskThreadPoolSize > 0) {
+          return targetTaskThreadPoolSize;
+        }
+      } else {
+        LOG.warn(
+            "Got null as InstanceConfig for instance {} in cluster {}. 
Continuing with ClusterConfig. ",
+            instanceName, clusterName);
+      }
+    } catch (HelixException e) {
+      LOG.warn(
+          "Encountered an exception while fetching InstanceConfig for instance 
{} in cluster {}. Continuing with ClusterConfig. Exception: ",
+          instanceName, clusterName, e);
+    }
+
+    // Fallback to cluster config since instance config doesn't provide the 
value
+    try {
+      ClusterConfig clusterConfig = 
configAccessor.getClusterConfig(clusterName);
+      if (clusterConfig != null) {
+        int globalTargetTaskThreadPoolSize = 
clusterConfig.getGlobalTargetTaskThreadPoolSize();
+        if (globalTargetTaskThreadPoolSize > 0) {
+          return globalTargetTaskThreadPoolSize;
+        }
+      } else {
+        LOG.warn("Got null as ClusterConfig for cluster {}. Returning default 
value. ",

Review comment:
       TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +96,53 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  private ScheduledExecutorService createTaskExecutor(int taskThreadPoolSize) {
+    return Executors.newScheduledThreadPool(taskThreadPoolSize, new 
ThreadFactory() {
+      private AtomicInteger threadId = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "TaskStateModelFactory-task_thread-" + 
threadId.getAndIncrement());
+      }
+    });
+  }
+
+  private ScheduledExecutorService createTimerTaskExecutor() {
+    return Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "TaskStateModelFactory-timeTask_thread");
+      }
+    });
+  }

Review comment:
       Consider adding a TODO here - I'm not sure why this needs to be a single 
thread executor. We could certainly use more threads for timer tasks, but let's 
tackle this at a later point of this project.

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +96,53 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  private ScheduledExecutorService createTaskExecutor(int taskThreadPoolSize) {
+    return Executors.newScheduledThreadPool(taskThreadPoolSize, new 
ThreadFactory() {
+      private AtomicInteger threadId = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "TaskStateModelFactory-task_thread-" + 
threadId.getAndIncrement());
+      }
+    });
+  }
+
+  private ScheduledExecutorService createTimerTaskExecutor() {
+    return Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "TaskStateModelFactory-timeTask_thread");
+      }
+    });
+  }
+
+  private void initializeTaskMonitor() {
+    if (_taskExecutor instanceof ThreadPoolExecutor) {
+      try {
+        _monitor = new 
ThreadPoolExecutorMonitor(TaskConstants.STATE_MODEL_NAME,
+            (ThreadPoolExecutor) _taskExecutor);
+      } catch (JMException e) {
+        LOG.warn("Error in creating ThreadPoolExecutorMonitor for 
TaskStateModelFactory.");

Review comment:
       Nit, let's add e as the second parameter for log.warn.

##########
File path: 
helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java
##########
@@ -145,6 +147,43 @@ public void testSessionExpiryCreateLiveInstance() throws 
Exception {
     deleteCluster(clusterName);
   }
 
+  @Test(dependsOnMethods = "testSessionExpiryCreateLiveInstance")
+  public void testCurrentTaskThreadPoolSizeCreation() throws Exception {
+    final int testThreadPoolSize = TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE 
+ 1;

Review comment:
       Nit: might be a display of better craftsmanship/readability if you could 
explain why you're adding 1 here. I believe you're just trying to use a value 
that's not 40?

##########
File path: 
helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
##########
@@ -149,6 +150,7 @@ public void beforeClass() throws Exception {
     System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
         "http://"; + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + 
msdsNamespace);
 
+    HttpRoutingDataReader.reset();

Review comment:
       How is this test relevant to this PR?

##########
File path: 
helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java
##########
@@ -145,6 +147,43 @@ public void testSessionExpiryCreateLiveInstance() throws 
Exception {
     deleteCluster(clusterName);
   }
 
+  @Test(dependsOnMethods = "testSessionExpiryCreateLiveInstance")
+  public void testCurrentTaskThreadPoolSizeCreation() throws Exception {
+    final int testThreadPoolSize = TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE 
+ 1;
+    final String className = TestHelper.getTestClassName();
+    final String methodName = TestHelper.getTestMethodName();
+    final String clusterName = className + "_" + methodName;
+
+    final ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<>(ZK_ADDR));
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR,
+        12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        10, // partitions per resource
+        5, // number of nodes
+        3, // replicas
+        "MasterSlave",
+        true); // do rebalance
+
+    final String instanceName = "localhost_12918";
+    final MockParticipantManager manager =
+        new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+
+    InstanceConfig instanceConfig = 
accessor.getProperty(keyBuilder.instanceConfig(instanceName));
+    instanceConfig.setTargetTaskThreadPoolSize(testThreadPoolSize);
+    accessor.setProperty(keyBuilder.instanceConfig(instanceName), 
instanceConfig);
+
+    manager.syncStart();

Review comment:
       Are you stopping this manager and cleaning up this cluster after the 
test?

##########
File path: 
helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java
##########
@@ -145,6 +147,43 @@ public void testSessionExpiryCreateLiveInstance() throws 
Exception {
     deleteCluster(clusterName);
   }
 
+  @Test(dependsOnMethods = "testSessionExpiryCreateLiveInstance")
+  public void testCurrentTaskThreadPoolSizeCreation() throws Exception {
+    final int testThreadPoolSize = TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE 
+ 1;
+    final String className = TestHelper.getTestClassName();
+    final String methodName = TestHelper.getTestMethodName();
+    final String clusterName = className + "_" + methodName;
+
+    final ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<>(ZK_ADDR));

Review comment:
       If possible, could we start phasing out deprecated constructors (I 
believe this is deprecated)?

##########
File path: 
helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
##########
@@ -123,4 +123,28 @@ public void testSetInstanceCapacityMapInvalid() {
     InstanceConfig testConfig = new InstanceConfig("testConfig");
     testConfig.setInstanceCapacityMap(capacityDataMap);
   }
+
+  @Test
+  public void testGetTargetTaskThreadPoolSize() {
+    InstanceConfig testConfig = new InstanceConfig("testConfig");
+    testConfig.getRecord().setIntField(
+        
InstanceConfig.InstanceConfigProperty.TARGET_TASK_THREAD_POOL_SIZE.name(), 100);
+
+    Assert.assertEquals(testConfig.getTargetTaskThreadPoolSize(), 100);
+  }
+
+  @Test
+  public void testSetTargetTaskThreadPoolSize() {
+    InstanceConfig testConfig = new InstanceConfig("testConfig");
+    testConfig.setTargetTaskThreadPoolSize(100);
+
+    Assert.assertEquals(testConfig.getRecord().getIntField(
+        
InstanceConfig.InstanceConfigProperty.TARGET_TASK_THREAD_POOL_SIZE.name(), -1), 
100);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testSetTargetTaskThreadPoolSizeIllegalArgument() {
+    InstanceConfig testConfig = new InstanceConfig("testConfig");
+    testConfig.setTargetTaskThreadPoolSize(0);

Review comment:
       We should consider allowing the value 0. Negative values don't make any 
sense so those should still not be allowed.

##########
File path: 
helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
##########
@@ -69,6 +69,30 @@ public void testSetCapacityKeysEmptyList() {
     testConfig.setInstanceCapacityKeys(Collections.emptyList());
   }
 
+  @Test
+  public void testGetGlobalTargetTaskThreadPoolSize() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.getRecord().setIntField(
+        
ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(), 
100);
+
+    Assert.assertEquals(testConfig.getGlobalTargetTaskThreadPoolSize(), 100);
+  }
+
+  @Test
+  public void testSetGlobalTargetTaskThreadPoolSize() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setGlobalTargetTaskThreadPoolSize(100);
+
+    Assert.assertEquals(testConfig.getRecord().getIntField(
+        
ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(), 
-1), 100);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testSetGlobalTargetTaskThreadPoolSizeIllegalArgument() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setGlobalTargetTaskThreadPoolSize(0);

Review comment:
       We should maybe consider allowing the value 0. Negative values don't 
make any sense so those should still not be allowed.

##########
File path: 
helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
##########
@@ -0,0 +1,124 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer;
+import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestTaskStateModelFactory extends TaskTestBase {
+  // This value has to be different from the default value to verify 
correctness
+  private static final int TEST_TARGET_TASK_THREAD_POOL_SIZE =
+      TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
+
+  @Test
+  public void testConfigAccessorCreationMultiZk() throws Exception {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    InstanceConfig instanceConfig =
+        
InstanceConfig.toInstanceConfig(anyParticipantManager.getInstanceName());
+    
instanceConfig.setTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE);
+    anyParticipantManager.getConfigAccessor()
+        .setInstanceConfig(anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName(), instanceConfig);
+
+    // Start a msds server
+    // TODO: TestMultiZkHelixJavaApis already defined MSDS_SERVER_ENDPOINT, 
which goes into
+    // HttpRoutingDataReader and is recorded as final. As a result this test 
case has to use the
+    // same endpoint. There's no workaround at this moment.
+    final String msdsHostName = "localhost";
+    final int msdsPort = 11117;
+    final String msdsNamespace = "multiZkTest";
+    Map<String, Collection<String>> routingData = new HashMap<>();
+    routingData
+        .put(ZK_ADDR, Collections.singletonList("/" + 
anyParticipantManager.getClusterName()));
+    MockMetadataStoreDirectoryServer msds =
+        new MockMetadataStoreDirectoryServer(msdsHostName, msdsPort, 
msdsNamespace, routingData);
+    msds.startServer();
+
+    // Save previously-set system configs
+    String prevMultiZkEnabled = 
System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+    String prevMsdsServerEndpoint =
+        
System.getProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY);
+    // Turn on multiZk mode in System config
+    System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "true");
+    // MSDS endpoint: 
http://localhost:11117/admin/v2/namespaces/testTaskStateModelFactory
+    System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
+        "http://"; + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + 
msdsNamespace);
+
+    HttpRoutingDataReader.reset();
+    TaskStateModelFactory taskStateModelFactory =
+        new TaskStateModelFactory(anyParticipantManager, 
Collections.emptyMap());
+    ConfigAccessor configAccessor = 
taskStateModelFactory.createConfigAccessor();
+    Assert.assertEquals(TaskUtil
+        .getTargetThreadPoolSize(configAccessor, 
anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName()), 
TEST_TARGET_TASK_THREAD_POOL_SIZE);
+
+    // Restore system properties
+    if (prevMultiZkEnabled == null) {
+      System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+    } else {
+      System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, 
prevMultiZkEnabled);
+    }
+    if (prevMsdsServerEndpoint == null) {
+      System.clearProperty(SystemPropertyKeys.MSDS_SERVER_ENDPOINT_KEY);
+    } else {
+      System.setProperty(SystemPropertyKeys.MSDS_SERVER_ENDPOINT_KEY, 
prevMsdsServerEndpoint);
+    }
+    msds.stopServer();
+  }
+
+  @Test(dependsOnMethods = "testConfigAccessorCreationMultiZk")
+  public void testConfigAccessorCreationSingleZk() {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    // Save previously-set system configs
+    String prevMultiZkEnabled = 
System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+    // Turn on multiZk mode in System config

Review comment:
       Did you mean to say turn "off"? :)

##########
File path: 
helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
##########
@@ -0,0 +1,124 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer;
+import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestTaskStateModelFactory extends TaskTestBase {
+  // This value has to be different from the default value to verify 
correctness
+  private static final int TEST_TARGET_TASK_THREAD_POOL_SIZE =
+      TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
+
+  @Test
+  public void testConfigAccessorCreationMultiZk() throws Exception {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    InstanceConfig instanceConfig =
+        
InstanceConfig.toInstanceConfig(anyParticipantManager.getInstanceName());
+    
instanceConfig.setTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE);
+    anyParticipantManager.getConfigAccessor()
+        .setInstanceConfig(anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName(), instanceConfig);
+
+    // Start a msds server
+    // TODO: TestMultiZkHelixJavaApis already defined MSDS_SERVER_ENDPOINT, 
which goes into
+    // HttpRoutingDataReader and is recorded as final. As a result this test 
case has to use the
+    // same endpoint. There's no workaround at this moment.
+    final String msdsHostName = "localhost";
+    final int msdsPort = 11117;
+    final String msdsNamespace = "multiZkTest";

Review comment:
       This is okay. I'm not sure if we want to make it a TODO, but 
"multiZkTest" is a generic enough a name, so I'm not too bothered by this at 
the moment. The reason this is tricky is that in production, in order to set up 
a different MSDS, you would have to change the JVM config and restart the JVM, 
thereby updating the `final` field.
   
   I think the right TODO here is that now that we have a few tests that need 
to use the MSDS, so it would be a good idea to move this MSDS creation to the 
abstract test base so that we make it transparent to all tests, not a single 
test that tests multi-zk functionalities. What do you think? That way, we could 
make these values like msdsHostName, port, namespace all accessible to child 
classes, so you don't have to redefine them (just like how we do with ZK_ADDR 
and such).

##########
File path: 
helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
##########
@@ -0,0 +1,124 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer;
+import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestTaskStateModelFactory extends TaskTestBase {
+  // This value has to be different from the default value to verify 
correctness

Review comment:
       Great!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to