narendly commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r430049218
##########
File path:
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState
updateJobContextAndGetTaskCurrentState(
return stateFromContext == null ? TaskPartitionState.INIT :
stateFromContext;
}
TaskPartitionState currentState =
TaskPartitionState.valueOf(currentStateString);
+ // Update job context based on current state
+ updatePartitionInformationInJobContext(currentStateOutput, jobResource,
currentState, jobCtx,
+ pId, pName, instance);
+ return currentState;
+ }
+
+ /**
+ * Based on the CurrentState of this task and Context information, the task
information in the job
+ * context gets updated.
+ * @param currentStateOutput
+ * @param jobResource
+ * @param currentState
+ * @param jobCtx
+ * @param pId
+ * @param pName
+ * @param instance
+ */
+ private void updatePartitionInformationInJobContext(CurrentStateOutput
currentStateOutput,
+ String jobResource, TaskPartitionState currentState, JobContext jobCtx,
Integer pId,
+ String pName, String instance) {
+ // The assigned participant needs to be updated regardless of the current
state and context
+ // information because it will prevent controller to stuck in race
condition while there is two
+ // current states. In the updatePreviousAssignedTasksStatus, we check
+ // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the
assignment if instance is
+ // not equal to job context's AssignedParticipant for this pId.
jobCtx.setAssignedParticipant(pId, instance);
- jobCtx.setPartitionState(pId, currentState);
- String taskMsg = currentStateOutput.getInfo(jobResource, new
Partition(pName), instance);
- if (taskMsg != null) {
- jobCtx.setPartitionInfo(pId, taskMsg);
+ // If job context needs to be updated with new state, update it accordingly
+ // This check is necessary because we are relying on current state and we
do not want to update
+ // context as long as current state existed. We just want to update
context information
Review comment:
@alirezazamani -
Okay, I think your point about not updating when there's no delta is valid,
but I'm not sure if what you said about delayed scheduling is entirely true.
Delayed scheduling of tasks is a feature that was previously working, right? If
what you said about it was the case, then how come our delayed scheduling
feature was working all this time?
##########
File path:
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState
updateJobContextAndGetTaskCurrentState(
return stateFromContext == null ? TaskPartitionState.INIT :
stateFromContext;
}
TaskPartitionState currentState =
TaskPartitionState.valueOf(currentStateString);
+ // Update job context based on current state
+ updatePartitionInformationInJobContext(currentStateOutput, jobResource,
currentState, jobCtx,
+ pId, pName, instance);
+ return currentState;
+ }
+
+ /**
+ * Based on the CurrentState of this task and Context information, the task
information in the job
+ * context gets updated.
+ * @param currentStateOutput
+ * @param jobResource
+ * @param currentState
+ * @param jobCtx
+ * @param pId
+ * @param pName
+ * @param instance
+ */
+ private void updatePartitionInformationInJobContext(CurrentStateOutput
currentStateOutput,
+ String jobResource, TaskPartitionState currentState, JobContext jobCtx,
Integer pId,
+ String pName, String instance) {
+ // The assignedParticipant field needs to be updated regardless of the
current state and context
+ // information because it will prevent controller to assign the task to
the wrong participant
+ // for targeted tasks when two CurrentStates exist for one task.
+ // In the updatePreviousAssignedTasksStatus, we check
+ // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the
assignment if instance is
+ // not equal to job context's AssignedParticipant for this pId.
jobCtx.setAssignedParticipant(pId, instance);
- jobCtx.setPartitionState(pId, currentState);
- String taskMsg = currentStateOutput.getInfo(jobResource, new
Partition(pName), instance);
- if (taskMsg != null) {
- jobCtx.setPartitionInfo(pId, taskMsg);
+ // If job context needs to be updated with new state, update it accordingly
+ // This check is necessary because we are relying on current state and we
do not want to update
+ // context as long as current state existed. We just want to update
context information
+ // (specially finish time) once.
+ // This condition checks whether jobContext's state is out of date or not.
+ if (!currentState.equals(jobCtx.getPartitionState(pId))) {
Review comment:
@alirezazamani Another concern here:
Is it possible for a task to be retried so fast that it ends up being in the
same state? For example,
task_error -> (controller reschedules it) -> (controller sends messages,
error -> init, init -> running, running -> complete) -> participant processes
message so quickly but it goes into task_error again, and by the time
controller gets to this task, the states are the same as the previous run.
Basically, controller sees task_error -> task_error.
Is that going to be an issue? I guess partition id is set before this check
so that will be updated accordingly, but what about finish time or the time at
which the task was marked as error?
##########
File path:
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState
updateJobContextAndGetTaskCurrentState(
return stateFromContext == null ? TaskPartitionState.INIT :
stateFromContext;
}
TaskPartitionState currentState =
TaskPartitionState.valueOf(currentStateString);
+ // Update job context based on current state
+ updatePartitionInformationInJobContext(currentStateOutput, jobResource,
currentState, jobCtx,
+ pId, pName, instance);
+ return currentState;
+ }
+
+ /**
+ * Based on the CurrentState of this task and Context information, the task
information in the job
+ * context gets updated.
+ * @param currentStateOutput
+ * @param jobResource
+ * @param currentState
+ * @param jobCtx
+ * @param pId
+ * @param pName
+ * @param instance
+ */
+ private void updatePartitionInformationInJobContext(CurrentStateOutput
currentStateOutput,
+ String jobResource, TaskPartitionState currentState, JobContext jobCtx,
Integer pId,
+ String pName, String instance) {
+ // The assignedParticipant field needs to be updated regardless of the
current state and context
+ // information because it will prevent controller to assign the task to
the wrong participant
+ // for targeted tasks when two CurrentStates exist for one task.
+ // In the updatePreviousAssignedTasksStatus, we check
+ // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the
assignment if instance is
+ // not equal to job context's AssignedParticipant for this pId.
jobCtx.setAssignedParticipant(pId, instance);
- jobCtx.setPartitionState(pId, currentState);
- String taskMsg = currentStateOutput.getInfo(jobResource, new
Partition(pName), instance);
- if (taskMsg != null) {
- jobCtx.setPartitionInfo(pId, taskMsg);
+ // If job context needs to be updated with new state, update it accordingly
+ // This check is necessary because we are relying on current state and we
do not want to update
+ // context as long as current state existed. We just want to update
context information
+ // (specially finish time) once.
+ // This condition checks whether jobContext's state is out of date or not.
+ if (!currentState.equals(jobCtx.getPartitionState(pId))) {
Review comment:
@alirezazamani Another concern here:
Is it possible for a task to be retried so fast (on the participant side)
that it ends up being in the same state? For example,
task_error -> (controller reschedules it) -> (controller sends messages,
error -> init, init -> running, running -> complete) -> participant processes
message so quickly but it goes into task_error again, and by the time
controller gets to this task, the states are the same as the previous run.
Basically, controller sees task_error -> task_error.
Is that going to be an issue? I guess partition id is set before this check
so that will be updated accordingly, but what about finish time or the time at
which the task was marked as error?
##########
File path:
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState
updateJobContextAndGetTaskCurrentState(
return stateFromContext == null ? TaskPartitionState.INIT :
stateFromContext;
}
TaskPartitionState currentState =
TaskPartitionState.valueOf(currentStateString);
+ // Update job context based on current state
+ updatePartitionInformationInJobContext(currentStateOutput, jobResource,
currentState, jobCtx,
+ pId, pName, instance);
+ return currentState;
+ }
+
+ /**
+ * Based on the CurrentState of this task and Context information, the task
information in the job
+ * context gets updated.
+ * @param currentStateOutput
+ * @param jobResource
+ * @param currentState
+ * @param jobCtx
+ * @param pId
+ * @param pName
+ * @param instance
+ */
+ private void updatePartitionInformationInJobContext(CurrentStateOutput
currentStateOutput,
+ String jobResource, TaskPartitionState currentState, JobContext jobCtx,
Integer pId,
+ String pName, String instance) {
+ // The assignedParticipant field needs to be updated regardless of the
current state and context
+ // information because it will prevent controller to assign the task to
the wrong participant
+ // for targeted tasks when two CurrentStates exist for one task.
+ // In the updatePreviousAssignedTasksStatus, we check
+ // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the
assignment if instance is
+ // not equal to job context's AssignedParticipant for this pId.
jobCtx.setAssignedParticipant(pId, instance);
- jobCtx.setPartitionState(pId, currentState);
- String taskMsg = currentStateOutput.getInfo(jobResource, new
Partition(pName), instance);
- if (taskMsg != null) {
- jobCtx.setPartitionInfo(pId, taskMsg);
+ // If job context needs to be updated with new state, update it accordingly
+ // This check is necessary because we are relying on current state and we
do not want to update
+ // context as long as current state existed. We just want to update
context information
+ // (specially finish time) once.
+ // This condition checks whether jobContext's state is out of date or not.
+ if (!currentState.equals(jobCtx.getPartitionState(pId))) {
Review comment:
> 1- controller will not send running -> complete unless participant
requested state is COMPLETE
Are you sure this is true? Can you verify that init -> running / running ->
complete are not sent out at the same time? The target state during message
generation phase should be complete if I recall correctly.
##########
File path:
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState
updateJobContextAndGetTaskCurrentState(
return stateFromContext == null ? TaskPartitionState.INIT :
stateFromContext;
}
TaskPartitionState currentState =
TaskPartitionState.valueOf(currentStateString);
+ // Update job context based on current state
+ updatePartitionInformationInJobContext(currentStateOutput, jobResource,
currentState, jobCtx,
+ pId, pName, instance);
+ return currentState;
+ }
+
+ /**
+ * Based on the CurrentState of this task and Context information, the task
information in the job
+ * context gets updated.
+ * @param currentStateOutput
+ * @param jobResource
+ * @param currentState
+ * @param jobCtx
+ * @param pId
+ * @param pName
+ * @param instance
+ */
+ private void updatePartitionInformationInJobContext(CurrentStateOutput
currentStateOutput,
+ String jobResource, TaskPartitionState currentState, JobContext jobCtx,
Integer pId,
+ String pName, String instance) {
+ // The assigned participant needs to be updated regardless of the current
state and context
+ // information because it will prevent controller to stuck in race
condition while there is two
+ // current states. In the updatePreviousAssignedTasksStatus, we check
+ // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the
assignment if instance is
+ // not equal to job context's AssignedParticipant for this pId.
jobCtx.setAssignedParticipant(pId, instance);
- jobCtx.setPartitionState(pId, currentState);
- String taskMsg = currentStateOutput.getInfo(jobResource, new
Partition(pName), instance);
- if (taskMsg != null) {
- jobCtx.setPartitionInfo(pId, taskMsg);
+ // If job context needs to be updated with new state, update it accordingly
+ // This check is necessary because we are relying on current state and we
do not want to update
+ // context as long as current state existed. We just want to update
context information
Review comment:
Thanks for the explanation. The reason we need to know if CurrentState
changed is because we no longer have prevAssignment.
##########
File path:
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState
updateJobContextAndGetTaskCurrentState(
return stateFromContext == null ? TaskPartitionState.INIT :
stateFromContext;
}
TaskPartitionState currentState =
TaskPartitionState.valueOf(currentStateString);
+ // Update job context based on current state
+ updatePartitionInformationInJobContext(currentStateOutput, jobResource,
currentState, jobCtx,
+ pId, pName, instance);
+ return currentState;
+ }
+
+ /**
+ * Based on the CurrentState of this task and Context information, the task
information in the job
+ * context gets updated.
+ * @param currentStateOutput
+ * @param jobResource
+ * @param currentState
+ * @param jobCtx
+ * @param pId
+ * @param pName
+ * @param instance
+ */
+ private void updatePartitionInformationInJobContext(CurrentStateOutput
currentStateOutput,
+ String jobResource, TaskPartitionState currentState, JobContext jobCtx,
Integer pId,
+ String pName, String instance) {
+ // The assignedParticipant field needs to be updated regardless of the
current state and context
+ // information because it will prevent controller to assign the task to
the wrong participant
+ // for targeted tasks when two CurrentStates exist for one task.
+ // In the updatePreviousAssignedTasksStatus, we check
+ // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the
assignment if instance is
+ // not equal to job context's AssignedParticipant for this pId.
jobCtx.setAssignedParticipant(pId, instance);
- jobCtx.setPartitionState(pId, currentState);
- String taskMsg = currentStateOutput.getInfo(jobResource, new
Partition(pName), instance);
- if (taskMsg != null) {
- jobCtx.setPartitionInfo(pId, taskMsg);
+ // If job context needs to be updated with new state, update it accordingly
+ // This check is necessary because we are relying on current state and we
do not want to update
+ // context as long as current state existed. We just want to update
context information
+ // (specially finish time) once.
+ // This condition checks whether jobContext's state is out of date or not.
+ if (!currentState.equals(jobCtx.getPartitionState(pId))) {
Review comment:
Sounds good. Overall I think this analysis is sound. Let's make sure we
do a careful round of E2E testing as well as load testing.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]