jiajunwang commented on a change in pull request #1326:
URL: https://github.com/apache/helix/pull/1326#discussion_r483347389



##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
##########
@@ -74,9 +79,8 @@ public void process(ClusterEvent event) throws Exception {
               cache.getResourceConfig(resourceName));
           resourceMap.put(resourceName, resource);
 
-          if (!idealState.isValid() && !isTaskCache
-              || 
idealState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME) && 
isTaskCache
-              || 
!idealState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME) && 
!isTaskCache) {
+          if (!isTaskCache && (!idealState.isValid() || 
!idealState.getStateModelDefRef()

Review comment:
       Humm... this condition was not readable, even the new one is confusing. 
Could you please add a comment here for future readers?

##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
##########
@@ -22,6 +22,7 @@
 public enum AttributeName {
   RESOURCES,
   RESOURCES_TO_REBALANCE,
+  TASK_RESOURCES_TO_DROP,

Review comment:
       Is it possible that we just check in the task dispatch stage to remove 
all the resources in the RESOURCES_TO_REBALANCE but with no ideal state?
   So we don't need this attribute.

##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
##########
@@ -98,6 +102,15 @@ public void process(ClusterEvent event) throws Exception {
       }
     }
 
+    // Add TaskFramework resources from workflow and job configs as Task 
Framework will no longer
+    // use IdealState
+    if (isTaskCache) {
+      WorkflowControllerDataProvider taskDataCache =
+          event.getAttribute(AttributeName.ControllerDataProvider.name());
+      processWorkflowConfigs(taskDataCache, resourceMap, resourceToRebalance);
+      processJobConfigs(taskDataCache, resourceMap, resourceToRebalance, 
idealStates);
+    }
+
     // It's important to get partitions from CurrentState as well since the
     // idealState might be removed.
     Map<String, LiveInstance> availableInstances = cache.getLiveInstances();

Review comment:
       Suggest moving the following logic of processing CS to a separate method.

##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
##########
@@ -98,6 +102,15 @@ public void process(ClusterEvent event) throws Exception {
       }
     }

Review comment:
       It's hard to comment with github review, but with your change, I think 
the whole logic is cleaner. It would either read IdealState for regular 
resources or read Task config for task resources.
   In this case, could you please consider the following changes,
   1. Create a TaskResourceComputationStage, because we the reuse logic has 
been reduced quite a lot. It is no longer reasonable to put all logic in one 
class.
   2. If another class is too much, we should at least make the workflow clean 
to fit the following structure:
   if (taskCache) {
     // process TF Resource Objects...
   } else {
     // process Regular Resource Objects...
   }
   // Read the current state to backfill any resources that need to be removed.

##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -94,6 +95,8 @@ public ResourceAssignment 
processJobStatusUpdateAndAssignment(String jobName,
           workflowResource, jobName, workflowState, jobState));
       finishJobInRuntimeJobDag(_dataProvider.getTaskDataCache(), 
workflowResource, jobName);
       TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), 
jobName);
+      // New pipeline trigger for workflow status update

Review comment:
       nit, "job status update"?

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -519,6 +520,8 @@ protected void handleJobTimeout(JobContext jobCtx, 
WorkflowContext workflowCtx,
     _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.TIMED_OUT);
     _rebalanceScheduler.removeScheduledRebalance(jobResource);
     TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), 
jobResource);
+    // New pipeline trigger for workflow status update
+    RebalanceUtil.scheduleOnDemandPipeline(_manager.getClusterName(),0L,false);

Review comment:
       2 questions.
   1. I assume the IS change will trigger the pipeline. But obviously, if the 
TF logic says goodbye to IS, then this won't trigger the right pipeline anymore.
   2. In this case, shall we just listen to the context change? Depends on 
onDemond pipeline is not a scalable solution in general.
   
   Please correct me if I misunderstood the first point.

##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
##########
@@ -130,11 +143,15 @@ public void process(ClusterEvent event) throws Exception {
             
resource.setStateModelFactoryName(currentState.getStateModelFactoryName());
             resource.setBucketSize(currentState.getBucketSize());
             resource.setBatchMessageMode(currentState.getBatchMessageMode());
-            if (resource.getStateModelDefRef() == null && !isTaskCache
-                || resource.getStateModelDefRef() != null && (
-                
resource.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME) && 
isTaskCache
-                    || 
!resource.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)
-                    && !isTaskCache)) {
+            if (!isTaskCache && (resource.getStateModelDefRef() == null
+                || 
!TaskConstants.STATE_MODEL_NAME.equals(resource.getStateModelDefRef()))) {
+              resourceToRebalance.put(resourceName, resource);

Review comment:
       1. I think taskResourcesToDrop is not necessary since we should be able 
to tell in the dispatch stage.
   2. It seems resourceToRebalance will be put anyway. So it seems to be 
(maybe) some redundant code.

##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
##########
@@ -185,4 +250,21 @@ private void addPartition(String partition, String 
resourceName, Map<String, Res
     resource.addPartition(partition);
 
   }
+
+  private void addResourceConfigToResourceMap(String resourceName, 
ResourceConfig resourceConfig,
+      ClusterConfig clusterConfig, Map<String, Resource> resourceMap,
+      Map<String, Resource> resourceToRebalance) {
+    Resource resource = new Resource(resourceName, clusterConfig, 
resourceConfig);
+    resourceMap.put(resourceName, resource);

Review comment:
       I see much duplicate code here. Could you please check if we can use the 
same code for the regular resource addition?

##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
##########
@@ -74,9 +79,8 @@ public void process(ClusterEvent event) throws Exception {
               cache.getResourceConfig(resourceName));
           resourceMap.put(resourceName, resource);
 
-          if (!idealState.isValid() && !isTaskCache
-              || 
idealState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME) && 
isTaskCache
-              || 
!idealState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME) && 
!isTaskCache) {
+          if (!isTaskCache && (!idealState.isValid() || 
!idealState.getStateModelDefRef()

Review comment:
       I'm very curious why we need to rebalance an invalid IS node?

##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
##########
@@ -110,117 +104,17 @@ private BestPossibleStateOutput compute(ClusterEvent 
event, Map<String, Resource
       restOfResources.remove(jobName);
     }
 
-    // Current rest of resources including: only current state left over ones
-    // Original resource map contains workflows + jobs + other invalid 
resources
-    // After removing workflows + jobs, only leftover ones will go over old 
rebalance pipeline.
-    for (Resource resource : restOfResources.values()) {
-      if (!computeResourceBestPossibleState(event, cache, currentStateOutput, 
resource, output)) {
-        failureResources.add(resource.getResourceName());
-        LogUtil.logWarn(logger, _eventId,
-            "Failed to calculate best possible states for " + 
resource.getResourceName());
-      }
+    Map<String, Resource> taskResourcesToDrop =
+        event.getAttribute(AttributeName.TASK_RESOURCES_TO_DROP.name());
+    for (String resourceName : taskResourcesToDrop.keySet()) {
+      ResourceAssignment emptyAssignment =
+          _workflowDispatcher.buildEmptyAssignment(resourceName, 
currentStateOutput);
+      _workflowDispatcher.updateBestPossibleStateOutput(resourceName, 
emptyAssignment, output);

Review comment:
       Can this logic be handled inside the workflow dispatcher just like the 
job dispatcher? The current code breaks workflow dispatcher OO design and lets 
the Stage class takes the responsibility of the dispatcher.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to