narendly commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r430049218



##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState 
updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : 
stateFromContext;
     }
     TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, 
currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task 
information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput 
currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, 
Integer pId,
+      String pName, String instance) {
+    // The assigned participant needs to be updated regardless of the current 
state and context
+    // information because it will prevent controller to stuck in race 
condition while there is two
+    // current states. In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the 
assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new 
Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we 
do not want to update
+    // context as long as current state existed. We just want to update 
context information

Review comment:
       @alirezazamani -
   
   Okay, I think your point about not updating when there's no delta is valid, 
but I'm not sure if what you said about delayed scheduling is entirely true. 
Delayed scheduling of tasks is a feature that was previously working, right? If 
what you said about it was the case, then how come our delayed scheduling 
feature was working all this time?

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState 
updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : 
stateFromContext;
     }
     TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, 
currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task 
information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput 
currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, 
Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the 
current state and context
+    // information because it will prevent controller to assign the task to 
the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the 
assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new 
Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we 
do not want to update
+    // context as long as current state existed. We just want to update 
context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {

Review comment:
       @alirezazamani Another concern here:
   
   Is it possible for a task to be retried so fast that it ends up being in the 
same state? For example,
   
   task_error -> (controller reschedules it) -> (controller sends messages, 
error -> init, init -> running, running -> complete) -> participant processes 
message so quickly but it goes into task_error again, and by the time 
controller gets to this task, the states are the same as the previous run. 
Basically, controller sees task_error ->  task_error.
   
   Is that going to be an issue? I guess partition id is set before this check 
so that will be updated accordingly, but what about finish time or the time at 
which the task was marked as error?

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState 
updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : 
stateFromContext;
     }
     TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, 
currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task 
information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput 
currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, 
Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the 
current state and context
+    // information because it will prevent controller to assign the task to 
the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the 
assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new 
Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we 
do not want to update
+    // context as long as current state existed. We just want to update 
context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {

Review comment:
       @alirezazamani Another concern here:
   
   Is it possible for a task to be retried so fast (on the participant side) 
that it ends up being in the same state? For example,
   
   task_error -> (controller reschedules it) -> (controller sends messages, 
error -> init, init -> running, running -> complete) -> participant processes 
message so quickly but it goes into task_error again, and by the time 
controller gets to this task, the states are the same as the previous run. 
Basically, controller sees task_error ->  task_error.
   
   Is that going to be an issue? I guess partition id is set before this check 
so that will be updated accordingly, but what about finish time or the time at 
which the task was marked as error?

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState 
updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : 
stateFromContext;
     }
     TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, 
currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task 
information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput 
currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, 
Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the 
current state and context
+    // information because it will prevent controller to assign the task to 
the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the 
assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new 
Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we 
do not want to update
+    // context as long as current state existed. We just want to update 
context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {

Review comment:
       > 1- controller will not send running -> complete unless participant 
requested state is COMPLETE
   
   Are you sure this is true? Can you verify that init -> running / running -> 
complete are not sent out at the same time? The target state during message 
generation phase should be complete if I recall correctly.

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState 
updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : 
stateFromContext;
     }
     TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, 
currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task 
information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput 
currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, 
Integer pId,
+      String pName, String instance) {
+    // The assigned participant needs to be updated regardless of the current 
state and context
+    // information because it will prevent controller to stuck in race 
condition while there is two
+    // current states. In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the 
assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new 
Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we 
do not want to update
+    // context as long as current state existed. We just want to update 
context information

Review comment:
       Thanks for the explanation. The reason we need to know if CurrentState 
changed is because we no longer have prevAssignment.

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState 
updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : 
stateFromContext;
     }
     TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, 
currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task 
information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput 
currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, 
Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the 
current state and context
+    // information because it will prevent controller to assign the task to 
the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the 
assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new 
Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we 
do not want to update
+    // context as long as current state existed. We just want to update 
context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {

Review comment:
       Sounds good. Overall I think this analysis is sound. Let's make sure we 
do a careful round of  E2E testing as well as load testing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to