alirezazamani commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r430580638



##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState 
updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : 
stateFromContext;
     }
     TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, 
currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task 
information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput 
currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, 
Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the 
current state and context
+    // information because it will prevent controller to assign the task to 
the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the 
assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new 
Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we 
do not want to update
+    // context as long as current state existed. We just want to update 
context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {

Review comment:
       @narendly Thank you for mentioning these corner cases. 
   
   I went through the logic and I am sure the code handles this corner case as 
well. Let me explain why (please note that if task goes to terminal states, 
with this PR we are dropping 
    the task first and schedule them in next pipeline):
   
   Here are the steps:
   1- Requested State is set to TASK_ERROR by the participant. 
   2- Controller sends RUNNING -> TASK_ERROR to participant.
   3- Participant process it and makes current state as TASK_ERROR.
   4- Controller see this (i.e. TASK_ERROR) current state and send TASK_ERROR 
-> DROPPED message.
   5- Participant gets the message and drops/removes current states.
   6- Controller does not see current state (because it is removed) and sets 
the context of the task to be INIT.  Schedule the task again on the participant 
by sending INIT-> RUNNING message. (this happens in 
handleAdditionalTaskAssignment method).
   *Now controller by itself does not send RUNNING -> COMPLETE because COMPLETE 
state should be requested by the participant. The participant will either 
request TASK_ERROR or COMPLETE state in current state.
   7- If the task goes to error state we see this delta again (because context 
is set to be INIT before scheduling the task) and mark it ERROR with new finish 
time. 
   
   There are two points here that helps controller to handle this cornet case:
   1- controller will not send running -> complete unless participant requested 
state is COMPLETE. So if the task goes to TASK_ERROR in participant, the 
requested state will be TASK_ERROR and controller always respects the requested 
state and send RUNNING -> TASK_ERROR. 
   2- Dropping the task in terminal state makes the controller to reschedule 
the task with jobContext as INIT. This behavior helps the controller to see 
these deltas in future pipelines and update the information correctly in 
jobContext.
   

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState 
updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : 
stateFromContext;
     }
     TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, 
currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task 
information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput 
currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, 
Integer pId,
+      String pName, String instance) {
+    // The assigned participant needs to be updated regardless of the current 
state and context
+    // information because it will prevent controller to stuck in race 
condition while there is two
+    // current states. In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the 
assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new 
Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we 
do not want to update
+    // context as long as current state existed. We just want to update 
context information

Review comment:
       @narendly 
   
   The reason that delayed scheduling was working is because we were relying on 
previousAssignment. Let's say we are relying on previousAssignment and we 
decided to send the task to TASK_ERROR. This task will be recorded in 
previousAssignment and next pipeline we iterate through the task existed in 
previous assignment and record finish time and context information. Then this 
task will not be existed in future pipelines previousAssignment and we do not 
update finish time in the future pipelines. (Actually some of the test guided 
me toward this change).
   
   However, if we want to be independent of previousAssignment and rely on 
currentState (which is also means we are relying on participant's reactions), 
we need to consider that currentState might not change for several pipelines. 
In this case, it is necessary to monitor these deltas and not update the 
context information (specially time sensitive information) multiple times.

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState 
updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : 
stateFromContext;
     }
     TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, 
currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task 
information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput 
currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, 
Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the 
current state and context
+    // information because it will prevent controller to assign the task to 
the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the 
assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new 
Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we 
do not want to update
+    // context as long as current state existed. We just want to update 
context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {

Review comment:
       @narendly Thank you for mentioning these corner cases. 
   
   I went through the logic and I am sure the code handles this corner case as 
well. Let me explain why (please note that if task goes to terminal states, 
with this PR we are dropping the task first and schedule them in next pipeline):
   
   Here are the steps:
   1- Requested State is set to TASK_ERROR by the participant. 
   2- Controller sends RUNNING -> TASK_ERROR to participant.
   3- Participant process it and makes current state as TASK_ERROR.
   4- Controller see this (i.e. TASK_ERROR) current state and send TASK_ERROR 
-> DROPPED message.
   5- Participant gets the message and drops/removes current states.
   6- Controller does not see current state (because it is removed) and sets 
the context of the task to be INIT.  Schedule the task again on the participant 
by sending INIT-> RUNNING message. (this happens in 
handleAdditionalTaskAssignment method).
   *Now controller by itself does not send RUNNING -> COMPLETE because COMPLETE 
state should be requested by the participant. The participant will either 
request TASK_ERROR or COMPLETE state in current state.
   7- If the task goes to error state we see this delta again (because context 
is set to be INIT before scheduling the task) and mark it ERROR with new finish 
time. 
   
   There are two points here that helps controller to handle this cornet case:
   1- controller will not send running -> complete unless participant requested 
state is COMPLETE. So if the task goes to TASK_ERROR in participant, the 
requested state will be TASK_ERROR and controller always respects the requested 
state and send RUNNING -> TASK_ERROR. 
   2- Dropping the task in terminal state makes the controller to reschedule 
the task with jobContext as INIT. This behavior helps the controller to see 
these deltas in future pipelines and update the information correctly in 
jobContext.
   

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState 
updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : 
stateFromContext;
     }
     TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, 
currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task 
information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput 
currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, 
Integer pId,
+      String pName, String instance) {
+    // The assigned participant needs to be updated regardless of the current 
state and context
+    // information because it will prevent controller to stuck in race 
condition while there is two
+    // current states. In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the 
assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new 
Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we 
do not want to update
+    // context as long as current state existed. We just want to update 
context information

Review comment:
       @narendly 
   
   The reason that delayed scheduling was working is because we were relying on 
previousAssignment. Let's say we are relying on previousAssignment and we 
decided to send the task to TASK_ERROR. This task will be recorded in 
previousAssignment and next pipeline we iterate through the task existed in 
previous assignment and record finish time and context information. Then this 
task will not be existed in future pipelines previousAssignment and we do not 
update finish time in the future pipelines. (Actually some of the tests have 
guided me toward this change).
   
   However, if we want to be independent of previousAssignment and rely on 
currentState (which is also means we are relying on participant's reactions), 
we need to consider that currentState might not change for several pipelines. 
In this case, it is necessary to monitor these deltas and not update the 
context information (specially time sensitive information) multiple times.

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState 
updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : 
stateFromContext;
     }
     TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, 
currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task 
information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput 
currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, 
Integer pId,
+      String pName, String instance) {
+    // The assigned participant needs to be updated regardless of the current 
state and context
+    // information because it will prevent controller to stuck in race 
condition while there is two
+    // current states. In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the 
assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new 
Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we 
do not want to update
+    // context as long as current state existed. We just want to update 
context information

Review comment:
       @narendly 
   
   The reason that delayed scheduling was working is because we were relying on 
previousAssignment. Let's say we are relying on previousAssignment and we 
decided to send the task to TASK_ERROR. This task will be recorded in 
previousAssignment and next pipeline we iterate through the task existed in 
previous assignment and record finish time and context information. Then this 
task will not be existed in future pipelines previousAssignment and we do not 
update finish time in the future pipelines. (Actually some of the tests have 
guided me toward this change).
   
   However, if we want to be independent of previousAssignment and rely on 
currentState (which also means we are relying on participant's reactions), we 
need to consider that currentState might not change for several pipelines. In 
this case, it is necessary to monitor these deltas and not update the context 
information (specially time sensitive information) multiple times.

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState 
updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : 
stateFromContext;
     }
     TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, 
currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task 
information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput 
currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, 
Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the 
current state and context
+    // information because it will prevent controller to assign the task to 
the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the 
assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new 
Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we 
do not want to update
+    // context as long as current state existed. We just want to update 
context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {

Review comment:
       I double checked again and it seems the only way that controller sends 
COMPLETED message to participant should be initiated by participant by setting 
the requested state to be completed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to