NealSun96 commented on a change in pull request #1422:
URL: https://github.com/apache/helix/pull/1422#discussion_r500469673



##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -122,30 +122,37 @@ public void updatePreviousAssignedTasksStatus(
       Set<Integer> donePartitions = new TreeSet<>();
       for (int pId : pSet) {
         final String pName = pName(jobResource, pId);
-        TaskPartitionState currState = 
updateJobContextAndGetTaskCurrentState(currStateOutput,
+        TaskPartitionState currState = getTaskCurrentState(currStateOutput,
             jobResource, pId, pName, instance, jobCtx, jobTgtState);
 
-        if (!instance.equals(jobCtx.getAssignedParticipant(pId))) {
-          LOG.warn(
-              "Instance {} does not match the assigned participant for pId {} 
in the job context. Skipping task scheduling.",
-              instance, pId);
-          continue;
-        }
-
         // Check for pending state transitions on this (partition, instance). 
If there is a pending
         // state transition, we prioritize this pending state transition and 
set the assignment from
         // this pending state transition, essentially "waiting" until this 
pending message clears
+        // If there is a pending message, we should not continue to update the 
context because from
+        // controller prospective, state transition has not been completed yet 
if pending message
+        // still existed.
+        // If context gets updated here, controller might remove the job from 
RunTimeJobDAG which
+        // can cause the task's CurrentState not being removed when there is a 
pending message for
+        // that task.
         Message pendingMessage =
             currStateOutput.getPendingMessage(jobResource, new 
Partition(pName), instance);
-        if (pendingMessage != null && 
!pendingMessage.getToState().equals(currState.name())) {

Review comment:
       Thanks for explaining. So it looks to be a problem with 2 parts:
   1. Job context is updated while there are pending messages. This can cause 
jobs to be removed since it's marked as completed. If DROPPED message isn't 
correctly sent here, then the DROPPED message will never be sent because the 
job is deleted;
   2. Current state gets updated but pending messages aren't removed (async 
problem). In that case, the old if statement would not catch the problem and 
will proceed as if the message doesn't exist. 
   I suppose 2 is not a problem except when the job is about to be deleted; if 
the job isn't deleted, the pipeline can simply retry next time. 
   
   For 1, I don't understand why the new code in `TaskSchedulingStage` doesn't 
catch it. Specifically:
   ```
       // Jobs that exist in current states but are missing corresponding 
JobConfigs or WorkflowConfigs
       // or WorkflowContexts need to be cleaned up. Note that restOfResources 
can only be jobs,
       // because workflow resources are created based on Configs only - 
workflows don't have
       // CurrentStates
       for (String resourceName : restOfResources.keySet()) {
         _workflowDispatcher.processJobForDrop(resourceName, 
currentStateOutput, output);
       }
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to