alirezazamani commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r426807350
##########
File path:
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -99,16 +99,15 @@ public void updatePreviousAssignedTasksStatus(
}
// If not an excluded instance, we must instantiate its entry in
assignedPartitions
- Set<Integer> pSet = prevInstanceToTaskAssignments.get(instance);
+ Set<Integer> pSet = currentInstanceToTaskAssignments.get(instance);
// We need to remove all task pId's to be dropped because we already
made an assignment in
// paMap above for them to be dropped. The following does this.
if (tasksToDrop.containsKey(instance)) {
pSet.removeAll(tasksToDrop.get(instance));
}
- // Used to keep track of partitions that are in one of the final states:
COMPLETED, TIMED_OUT,
- // TASK_ERROR, ERROR.
+ // Used to keep track of partitions that are in one the INIT or DROPPED
states.
Review comment:
Yes. Done.
##########
File path:
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -242,16 +230,16 @@ public void updatePreviousAssignedTasksStatus(
}
break;
case COMPLETED: {
- // The task has completed on this partition. Mark as such in the
context object.
- donePartitions.add(pId);
+ // The task has completed on this partition. Drop it from the
instance and add it to assignedPartitions in
+ // order to avoid scheduling it again in this pipeline.
Review comment:
The reason is the behavior has been changed in this PR. We have an
assignment for it and the assignment is dropped. So, it goes to assigned
partitions. Once it is dropped from participant, then we do not even consider
it for the new assignment.
##########
File path:
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -242,16 +230,16 @@ public void updatePreviousAssignedTasksStatus(
}
break;
case COMPLETED: {
- // The task has completed on this partition. Mark as such in the
context object.
- donePartitions.add(pId);
+ // The task has completed on this partition. Drop it from the
instance and add it to assignedPartitions in
+ // order to avoid scheduling it again in this pipeline.
+ assignedPartitions.get(instance).add(pId);
+ paMap.put(pId, new PartitionAssignment(instance,
TaskPartitionState.DROPPED.name()));
Review comment:
You are right. I also had this concern. But note that we do not change
the JobContext to DROPPED. Let’s say controller sends COMPLETED to DROPPED
message to the participant. Now, the current state will be COMPLETED until
participant processes the message. Once it is processed, the current state will
be null, and that task will not be considered again. So, to answer your
question, it is totally depending on the behavior of the participant. If
participant set the current state to DROPPED, which is not the case here, your
concern was correct. Since participant removes the current state of the task,
we do not need to be worry about that case.
##########
File path:
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -263,7 +251,11 @@ public void updatePreviousAssignedTasksStatus(
case TASK_ABORTED:
case ERROR: {
- donePartitions.add(pId); // The task may be rescheduled on a
different instance.
+ // First make this task which is in terminal state to be dropped.
+ // Later on, in next pipeline in handleAdditionalAssignments, the
task will be retried if possible.
+ // (meaning it is not ABORTED and max number of attempts has not
been reached yet)
+ assignedPartitions.get(instance).add(pId);
+ paMap.put(pId, new PartitionAssignment(instance,
TaskPartitionState.DROPPED.name()));
Review comment:
The answer to this question is also similar to COMPLETED case :). Please
see it above.
##########
File path:
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState
updateJobContextAndGetTaskCurrentState(
return stateFromContext == null ? TaskPartitionState.INIT :
stateFromContext;
}
TaskPartitionState currentState =
TaskPartitionState.valueOf(currentStateString);
+ // Update job context based on current state
+ updatePartitionInformationInJobContext(currentStateOutput, jobResource,
currentState, jobCtx,
+ pId, pName, instance);
+ return currentState;
+ }
+
+ /**
+ * Based on the CurrentState of this task and Context information, the task
information in the job
+ * context gets updated.
+ * @param currentStateOutput
+ * @param jobResource
+ * @param currentState
+ * @param jobCtx
+ * @param pId
+ * @param pName
+ * @param instance
+ */
+ private void updatePartitionInformationInJobContext(CurrentStateOutput
currentStateOutput,
+ String jobResource, TaskPartitionState currentState, JobContext jobCtx,
Integer pId,
+ String pName, String instance) {
+ // The assigned participant needs to be updated regardless of the current
state and context
Review comment:
Fixed. Thanks.
##########
File path:
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState
updateJobContextAndGetTaskCurrentState(
return stateFromContext == null ? TaskPartitionState.INIT :
stateFromContext;
}
TaskPartitionState currentState =
TaskPartitionState.valueOf(currentStateString);
+ // Update job context based on current state
+ updatePartitionInformationInJobContext(currentStateOutput, jobResource,
currentState, jobCtx,
+ pId, pName, instance);
+ return currentState;
+ }
+
+ /**
+ * Based on the CurrentState of this task and Context information, the task
information in the job
+ * context gets updated.
+ * @param currentStateOutput
+ * @param jobResource
+ * @param currentState
+ * @param jobCtx
+ * @param pId
+ * @param pName
+ * @param instance
+ */
+ private void updatePartitionInformationInJobContext(CurrentStateOutput
currentStateOutput,
+ String jobResource, TaskPartitionState currentState, JobContext jobCtx,
Integer pId,
+ String pName, String instance) {
+ // The assigned participant needs to be updated regardless of the current
state and context
+ // information because it will prevent controller to stuck in race
condition while there is two
Review comment:
The race condition that here is scenario that I am talking about is the
scenario where we have two current states of a task in different participants.
Since we have only one slot for each task in paMap, we want to make sure that
the task is DROPPED from the wrong one and is assigned to the correct instance.
I am basically talking about this #922 and #461. Let me rephrase this comment
to make it clear.
##########
File path:
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState
updateJobContextAndGetTaskCurrentState(
return stateFromContext == null ? TaskPartitionState.INIT :
stateFromContext;
}
TaskPartitionState currentState =
TaskPartitionState.valueOf(currentStateString);
+ // Update job context based on current state
+ updatePartitionInformationInJobContext(currentStateOutput, jobResource,
currentState, jobCtx,
+ pId, pName, instance);
+ return currentState;
+ }
+
+ /**
+ * Based on the CurrentState of this task and Context information, the task
information in the job
+ * context gets updated.
+ * @param currentStateOutput
+ * @param jobResource
+ * @param currentState
+ * @param jobCtx
+ * @param pId
+ * @param pName
+ * @param instance
+ */
+ private void updatePartitionInformationInJobContext(CurrentStateOutput
currentStateOutput,
+ String jobResource, TaskPartitionState currentState, JobContext jobCtx,
Integer pId,
+ String pName, String instance) {
+ // The assigned participant needs to be updated regardless of the current
state and context
+ // information because it will prevent controller to stuck in race
condition while there is two
+ // current states. In the updatePreviousAssignedTasksStatus, we check
+ // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the
assignment if instance is
+ // not equal to job context's AssignedParticipant for this pId.
Review comment:
Exactly. We add this check in PR #923 to help resolve this scenario.
Maybe using race condition is not the good word here. I rephrased the comments.
##########
File path:
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState
updateJobContextAndGetTaskCurrentState(
return stateFromContext == null ? TaskPartitionState.INIT :
stateFromContext;
}
TaskPartitionState currentState =
TaskPartitionState.valueOf(currentStateString);
+ // Update job context based on current state
+ updatePartitionInformationInJobContext(currentStateOutput, jobResource,
currentState, jobCtx,
+ pId, pName, instance);
+ return currentState;
+ }
+
+ /**
+ * Based on the CurrentState of this task and Context information, the task
information in the job
+ * context gets updated.
+ * @param currentStateOutput
+ * @param jobResource
+ * @param currentState
+ * @param jobCtx
+ * @param pId
+ * @param pName
+ * @param instance
+ */
+ private void updatePartitionInformationInJobContext(CurrentStateOutput
currentStateOutput,
+ String jobResource, TaskPartitionState currentState, JobContext jobCtx,
Integer pId,
+ String pName, String instance) {
+ // The assigned participant needs to be updated regardless of the current
state and context
+ // information because it will prevent controller to stuck in race
condition while there is two
+ // current states. In the updatePreviousAssignedTasksStatus, we check
+ // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the
assignment if instance is
+ // not equal to job context's AssignedParticipant for this pId.
jobCtx.setAssignedParticipant(pId, instance);
- jobCtx.setPartitionState(pId, currentState);
- String taskMsg = currentStateOutput.getInfo(jobResource, new
Partition(pName), instance);
- if (taskMsg != null) {
- jobCtx.setPartitionInfo(pId, taskMsg);
+ // If job context needs to be updated with new state, update it accordingly
+ // This check is necessary because we are relying on current state and we
do not want to update
+ // context as long as current state existed. We just want to update
context information
+ // (specially finish time) once.
+ if (!currentState.equals(jobCtx.getPartitionState(pId))) {
Review comment:
Exactly. I added a comment about it.
##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -380,72 +379,38 @@ private static boolean isJobComplete(JobContext ctx,
Set<Integer> allPartitions,
/**
* @param liveInstances
- * @param prevAssignment task partition -> (instance -> state)
- * @param allTaskPartitions all task partitionIds
* @param currStateOutput currentStates to make sure currentStates copied
over expired sessions
* are accounted for
* @param jobName job name
* @param tasksToDrop instance -> pId's, to gather all pIds that need to be
dropped
* @return instance -> partitionIds from previous assignment, if the
instance is still live
*/
- protected static Map<String, SortedSet<Integer>>
getPrevInstanceToTaskAssignments(
- Iterable<String> liveInstances, ResourceAssignment prevAssignment,
- Set<Integer> allTaskPartitions, CurrentStateOutput currStateOutput,
String jobName,
+ protected static Map<String, SortedSet<Integer>>
getCurrentInstanceToTaskAssignments(
+ Iterable<String> liveInstances, CurrentStateOutput currStateOutput,
String jobName,
Map<String, Set<Integer>> tasksToDrop) {
Map<String, SortedSet<Integer>> result = new HashMap<>();
for (String instance : liveInstances) {
result.put(instance, new TreeSet<>());
}
- // First, add all task partitions from prevAssignment
- // TODO: Remove this portion to get rid of prevAssignment from Task
Framework
- for (Partition partition : prevAssignment.getMappedPartitions()) {
- int pId = TaskUtil.getPartitionId(partition.getPartitionName());
- if (allTaskPartitions.contains(pId)) {
- Map<String, String> replicaMap =
prevAssignment.getReplicaMap(partition);
- for (String instance : replicaMap.keySet()) {
- SortedSet<Integer> pIdSet = result.get(instance);
- if (pIdSet != null) {
- pIdSet.add(pId);
- }
- }
- }
- }
-
- // Generate prevInstanceToTaskAssignment with CurrentStateOutput as source
of truth
-
- // Add all pIds existing in CurrentStateOutput as well because task
currentStates copied over
- // from previous sessions won't show up in prevInstanceToTaskAssignments
- // We need to add these back here in order for these task partitions to be
dropped (after a
- // copy-over, the Controller will send a message to drop the state
currentState)
- // partitions: (partition -> instance -> currentState)
+ // Generate currentInstanceToTaskAssignment with CurrentStateOutput as
source of truth
+ // Add all pIds existing in CurrentStateOutput
+ // We need to add these pIds here and make decision about the existing
tasks with current state
+ // in updatePreviousAssignedTasksStatus method
Review comment:
Fixed. Thanks.
##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -380,72 +379,38 @@ private static boolean isJobComplete(JobContext ctx,
Set<Integer> allPartitions,
/**
* @param liveInstances
- * @param prevAssignment task partition -> (instance -> state)
- * @param allTaskPartitions all task partitionIds
* @param currStateOutput currentStates to make sure currentStates copied
over expired sessions
* are accounted for
* @param jobName job name
* @param tasksToDrop instance -> pId's, to gather all pIds that need to be
dropped
* @return instance -> partitionIds from previous assignment, if the
instance is still live
*/
- protected static Map<String, SortedSet<Integer>>
getPrevInstanceToTaskAssignments(
- Iterable<String> liveInstances, ResourceAssignment prevAssignment,
- Set<Integer> allTaskPartitions, CurrentStateOutput currStateOutput,
String jobName,
+ protected static Map<String, SortedSet<Integer>>
getCurrentInstanceToTaskAssignments(
+ Iterable<String> liveInstances, CurrentStateOutput currStateOutput,
String jobName,
Map<String, Set<Integer>> tasksToDrop) {
Map<String, SortedSet<Integer>> result = new HashMap<>();
for (String instance : liveInstances) {
result.put(instance, new TreeSet<>());
}
- // First, add all task partitions from prevAssignment
- // TODO: Remove this portion to get rid of prevAssignment from Task
Framework
- for (Partition partition : prevAssignment.getMappedPartitions()) {
- int pId = TaskUtil.getPartitionId(partition.getPartitionName());
- if (allTaskPartitions.contains(pId)) {
- Map<String, String> replicaMap =
prevAssignment.getReplicaMap(partition);
- for (String instance : replicaMap.keySet()) {
- SortedSet<Integer> pIdSet = result.get(instance);
- if (pIdSet != null) {
- pIdSet.add(pId);
- }
- }
- }
- }
-
- // Generate prevInstanceToTaskAssignment with CurrentStateOutput as source
of truth
-
- // Add all pIds existing in CurrentStateOutput as well because task
currentStates copied over
- // from previous sessions won't show up in prevInstanceToTaskAssignments
- // We need to add these back here in order for these task partitions to be
dropped (after a
- // copy-over, the Controller will send a message to drop the state
currentState)
- // partitions: (partition -> instance -> currentState)
+ // Generate currentInstanceToTaskAssignment with CurrentStateOutput as
source of truth
+ // Add all pIds existing in CurrentStateOutput
+ // We need to add these pIds here and make decision about the existing
tasks with current state
+ // in updatePreviousAssignedTasksStatus method
Map<Partition, Map<String, String>> partitions =
currStateOutput.getCurrentStateMap(jobName);
for (Map.Entry<Partition, Map<String, String>> entry :
partitions.entrySet()) {
// Get all (instance -> currentState) mappings
for (Map.Entry<String, String> instanceToCurrState :
entry.getValue().entrySet()) {
String instance = instanceToCurrState.getKey();
String requestedState =
currStateOutput.getRequestedState(jobName, entry.getKey(),
instance);
- TaskPartitionState currState =
TaskPartitionState.valueOf(instanceToCurrState.getValue());
int pId = TaskUtil.getPartitionId(entry.getKey().getPartitionName());
if (result.containsKey(instance)) {
- // We must add all active task pIds back here because dropping
transition could overwrite
- // an active transition in paMap
- // Add all task partitions in the following states:
- // currState = INIT, requestedState = RUNNING (bootstrap)
- // currState = RUNNING, requestedState = ANY (active)
- // ** for tasks that are just in INIT state, we do not add them here
because old
- // Participants, upon connection reset, set tasks' currentStates to
INIT. We cannot
- // consider those tasks active **
- if (currState == TaskPartitionState.INIT && requestedState != null
- && requestedState.equals(TaskPartitionState.RUNNING.name())
- || currState == TaskPartitionState.RUNNING) {
- result.get(instance).add(pId);
- }
-
+ // We must add all pIds back here
Review comment:
I agree. Removed the comment. Basically, we have explained everything in
the comments already.
##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -380,72 +379,38 @@ private static boolean isJobComplete(JobContext ctx,
Set<Integer> allPartitions,
/**
* @param liveInstances
- * @param prevAssignment task partition -> (instance -> state)
- * @param allTaskPartitions all task partitionIds
* @param currStateOutput currentStates to make sure currentStates copied
over expired sessions
* are accounted for
* @param jobName job name
* @param tasksToDrop instance -> pId's, to gather all pIds that need to be
dropped
* @return instance -> partitionIds from previous assignment, if the
instance is still live
*/
- protected static Map<String, SortedSet<Integer>>
getPrevInstanceToTaskAssignments(
- Iterable<String> liveInstances, ResourceAssignment prevAssignment,
- Set<Integer> allTaskPartitions, CurrentStateOutput currStateOutput,
String jobName,
+ protected static Map<String, SortedSet<Integer>>
getCurrentInstanceToTaskAssignments(
+ Iterable<String> liveInstances, CurrentStateOutput currStateOutput,
String jobName,
Map<String, Set<Integer>> tasksToDrop) {
Map<String, SortedSet<Integer>> result = new HashMap<>();
for (String instance : liveInstances) {
result.put(instance, new TreeSet<>());
}
- // First, add all task partitions from prevAssignment
- // TODO: Remove this portion to get rid of prevAssignment from Task
Framework
- for (Partition partition : prevAssignment.getMappedPartitions()) {
- int pId = TaskUtil.getPartitionId(partition.getPartitionName());
- if (allTaskPartitions.contains(pId)) {
- Map<String, String> replicaMap =
prevAssignment.getReplicaMap(partition);
- for (String instance : replicaMap.keySet()) {
- SortedSet<Integer> pIdSet = result.get(instance);
- if (pIdSet != null) {
- pIdSet.add(pId);
- }
- }
- }
- }
-
- // Generate prevInstanceToTaskAssignment with CurrentStateOutput as source
of truth
-
- // Add all pIds existing in CurrentStateOutput as well because task
currentStates copied over
- // from previous sessions won't show up in prevInstanceToTaskAssignments
- // We need to add these back here in order for these task partitions to be
dropped (after a
- // copy-over, the Controller will send a message to drop the state
currentState)
- // partitions: (partition -> instance -> currentState)
+ // Generate currentInstanceToTaskAssignment with CurrentStateOutput as
source of truth
+ // Add all pIds existing in CurrentStateOutput
+ // We need to add these pIds here and make decision about the existing
tasks with current state
+ // in updatePreviousAssignedTasksStatus method
Map<Partition, Map<String, String>> partitions =
currStateOutput.getCurrentStateMap(jobName);
for (Map.Entry<Partition, Map<String, String>> entry :
partitions.entrySet()) {
// Get all (instance -> currentState) mappings
for (Map.Entry<String, String> instanceToCurrState :
entry.getValue().entrySet()) {
String instance = instanceToCurrState.getKey();
String requestedState =
currStateOutput.getRequestedState(jobName, entry.getKey(),
instance);
- TaskPartitionState currState =
TaskPartitionState.valueOf(instanceToCurrState.getValue());
int pId = TaskUtil.getPartitionId(entry.getKey().getPartitionName());
if (result.containsKey(instance)) {
- // We must add all active task pIds back here because dropping
transition could overwrite
- // an active transition in paMap
- // Add all task partitions in the following states:
- // currState = INIT, requestedState = RUNNING (bootstrap)
- // currState = RUNNING, requestedState = ANY (active)
- // ** for tasks that are just in INIT state, we do not add them here
because old
- // Participants, upon connection reset, set tasks' currentStates to
INIT. We cannot
- // consider those tasks active **
- if (currState == TaskPartitionState.INIT && requestedState != null
- && requestedState.equals(TaskPartitionState.RUNNING.name())
- || currState == TaskPartitionState.RUNNING) {
- result.get(instance).add(pId);
- }
-
+ // We must add all pIds back here
+ result.get(instance).add(pId);
// Check if this task needs to be dropped. If so, we need to add to
tasksToDrop no matter
- // what its current state is so that it will be dropped
+ // what its current state is so that it will be dropped.
Review comment:
Done.
##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -462,10 +427,10 @@ private static boolean isJobComplete(JobContext ctx,
Set<Integer> allPartitions,
* If partition is missing from prevInstanceToTaskAssignments (e.g. previous
assignment is
* deleted) it is added from context. Otherwise, the context won't be
updated.
* @param jobCtx Job Context
- * @param prevInstanceToTaskAssignments instance -> partitionIds from
previous assignment
+ * @param currentInstanceToTaskAssignments instance -> partitionIds from
current assignment
Review comment:
Done.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]