jiajunwang commented on a change in pull request #1326:
URL: https://github.com/apache/helix/pull/1326#discussion_r483347389
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
##########
@@ -74,9 +79,8 @@ public void process(ClusterEvent event) throws Exception {
cache.getResourceConfig(resourceName));
resourceMap.put(resourceName, resource);
- if (!idealState.isValid() && !isTaskCache
- ||
idealState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME) &&
isTaskCache
- ||
!idealState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME) &&
!isTaskCache) {
+ if (!isTaskCache && (!idealState.isValid() ||
!idealState.getStateModelDefRef()
Review comment:
Humm... this condition was not readable, even the new one is confusing.
Could you please add a comment here for future readers?
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
##########
@@ -22,6 +22,7 @@
public enum AttributeName {
RESOURCES,
RESOURCES_TO_REBALANCE,
+ TASK_RESOURCES_TO_DROP,
Review comment:
Is it possible that we just check in the task dispatch stage to remove
all the resources in the RESOURCES_TO_REBALANCE but with no ideal state?
So we don't need this attribute.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
##########
@@ -98,6 +102,15 @@ public void process(ClusterEvent event) throws Exception {
}
}
+ // Add TaskFramework resources from workflow and job configs as Task
Framework will no longer
+ // use IdealState
+ if (isTaskCache) {
+ WorkflowControllerDataProvider taskDataCache =
+ event.getAttribute(AttributeName.ControllerDataProvider.name());
+ processWorkflowConfigs(taskDataCache, resourceMap, resourceToRebalance);
+ processJobConfigs(taskDataCache, resourceMap, resourceToRebalance,
idealStates);
+ }
+
// It's important to get partitions from CurrentState as well since the
// idealState might be removed.
Map<String, LiveInstance> availableInstances = cache.getLiveInstances();
Review comment:
Suggest moving the following logic of processing CS to a separate method.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
##########
@@ -98,6 +102,15 @@ public void process(ClusterEvent event) throws Exception {
}
}
Review comment:
It's hard to comment with github review, but with your change, I think
the whole logic is cleaner. It would either read IdealState for regular
resources or read Task config for task resources.
In this case, could you please consider the following changes,
1. Create a TaskResourceComputationStage, because we the reuse logic has
been reduced quite a lot. It is no longer reasonable to put all logic in one
class.
2. If another class is too much, we should at least make the workflow clean
to fit the following structure:
if (taskCache) {
// process TF Resource Objects...
} else {
// process Regular Resource Objects...
}
// Read the current state to backfill any resources that need to be removed.
##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -94,6 +95,8 @@ public ResourceAssignment
processJobStatusUpdateAndAssignment(String jobName,
workflowResource, jobName, workflowState, jobState));
finishJobInRuntimeJobDag(_dataProvider.getTaskDataCache(),
workflowResource, jobName);
TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(),
jobName);
+ // New pipeline trigger for workflow status update
Review comment:
nit, "job status update"?
##########
File path:
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -519,6 +520,8 @@ protected void handleJobTimeout(JobContext jobCtx,
WorkflowContext workflowCtx,
_clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.TIMED_OUT);
_rebalanceScheduler.removeScheduledRebalance(jobResource);
TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(),
jobResource);
+ // New pipeline trigger for workflow status update
+ RebalanceUtil.scheduleOnDemandPipeline(_manager.getClusterName(),0L,false);
Review comment:
2 questions.
1. I assume the IS change will trigger the pipeline. But obviously, if the
TF logic says goodbye to IS, then this won't trigger the right pipeline anymore.
2. In this case, shall we just listen to the context change? Depends on
onDemond pipeline is not a scalable solution in general.
Please correct me if I misunderstood the first point.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
##########
@@ -130,11 +143,15 @@ public void process(ClusterEvent event) throws Exception {
resource.setStateModelFactoryName(currentState.getStateModelFactoryName());
resource.setBucketSize(currentState.getBucketSize());
resource.setBatchMessageMode(currentState.getBatchMessageMode());
- if (resource.getStateModelDefRef() == null && !isTaskCache
- || resource.getStateModelDefRef() != null && (
-
resource.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME) &&
isTaskCache
- ||
!resource.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)
- && !isTaskCache)) {
+ if (!isTaskCache && (resource.getStateModelDefRef() == null
+ ||
!TaskConstants.STATE_MODEL_NAME.equals(resource.getStateModelDefRef()))) {
+ resourceToRebalance.put(resourceName, resource);
Review comment:
1. I think taskResourcesToDrop is not necessary since we should be able
to tell in the dispatch stage.
2. It seems resourceToRebalance will be put anyway. So it seems to be
(maybe) some redundant code.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
##########
@@ -185,4 +250,21 @@ private void addPartition(String partition, String
resourceName, Map<String, Res
resource.addPartition(partition);
}
+
+ private void addResourceConfigToResourceMap(String resourceName,
ResourceConfig resourceConfig,
+ ClusterConfig clusterConfig, Map<String, Resource> resourceMap,
+ Map<String, Resource> resourceToRebalance) {
+ Resource resource = new Resource(resourceName, clusterConfig,
resourceConfig);
+ resourceMap.put(resourceName, resource);
Review comment:
I see much duplicate code here. Could you please check if we can use the
same code for the regular resource addition?
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
##########
@@ -74,9 +79,8 @@ public void process(ClusterEvent event) throws Exception {
cache.getResourceConfig(resourceName));
resourceMap.put(resourceName, resource);
- if (!idealState.isValid() && !isTaskCache
- ||
idealState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME) &&
isTaskCache
- ||
!idealState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME) &&
!isTaskCache) {
+ if (!isTaskCache && (!idealState.isValid() ||
!idealState.getStateModelDefRef()
Review comment:
I'm very curious why we need to rebalance an invalid IS node?
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
##########
@@ -110,117 +104,17 @@ private BestPossibleStateOutput compute(ClusterEvent
event, Map<String, Resource
restOfResources.remove(jobName);
}
- // Current rest of resources including: only current state left over ones
- // Original resource map contains workflows + jobs + other invalid
resources
- // After removing workflows + jobs, only leftover ones will go over old
rebalance pipeline.
- for (Resource resource : restOfResources.values()) {
- if (!computeResourceBestPossibleState(event, cache, currentStateOutput,
resource, output)) {
- failureResources.add(resource.getResourceName());
- LogUtil.logWarn(logger, _eventId,
- "Failed to calculate best possible states for " +
resource.getResourceName());
- }
+ Map<String, Resource> taskResourcesToDrop =
+ event.getAttribute(AttributeName.TASK_RESOURCES_TO_DROP.name());
+ for (String resourceName : taskResourcesToDrop.keySet()) {
+ ResourceAssignment emptyAssignment =
+ _workflowDispatcher.buildEmptyAssignment(resourceName,
currentStateOutput);
+ _workflowDispatcher.updateBestPossibleStateOutput(resourceName,
emptyAssignment, output);
Review comment:
Can this logic be handled inside the workflow dispatcher just like the
job dispatcher? The current code breaks workflow dispatcher OO design and lets
the Stage class takes the responsibility of the dispatcher.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]