alirezazamani commented on a change in pull request #1040:
URL: https://github.com/apache/helix/pull/1040#discussion_r433463896
##########
File path:
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -449,43 +448,31 @@ private void
updatePartitionInformationInJobContext(CurrentStateOutput currentSt
* @param paMap
* @param assignedPartitions
*/
- private void processTaskWithPendingMessage(ResourceAssignment
prevAssignment, Integer pId,
- String pName, String instance, Message pendingMessage, TaskState
jobState,
- TaskPartitionState currState, Map<Integer, PartitionAssignment> paMap,
- Map<String, Set<Integer>> assignedPartitions) {
-
- // stateMap is a mapping of Instance -> TaskPartitionState (String)
- Map<String, String> stateMap = prevAssignment.getReplicaMap(new
Partition(pName));
- if (stateMap != null) {
- String prevState = stateMap.get(instance);
- if (!pendingMessage.getToState().equals(prevState)) {
- LOG.warn(String.format(
- "Task pending to-state is %s while previous assigned state is %s.
This should not"
- + "happen.",
- pendingMessage.getToState(), prevState));
+ private void processTaskWithPendingMessage(Integer pId, String pName, String
instance,
+ Message pendingMessage, TaskState jobState, TaskPartitionState currState,
+ Map<Integer, PartitionAssignment> paMap, Map<String, Set<Integer>>
assignedPartitions) {
+
+ if (jobState == TaskState.TIMING_OUT && currState ==
TaskPartitionState.INIT
+ &&
pendingMessage.getToState().equals(TaskPartitionState.RUNNING.name())) {
+ // While job is timing out, if the task is pending on INIT->RUNNING, set
it back to INIT,
+ // so that Helix will cancel the transition.
+ paMap.put(pId, new PartitionAssignment(instance,
TaskPartitionState.INIT.name()));
Review comment:
@narendly Thank you for the suggestion. I think what you are suggesting
would also work. However, I think the logic behind this part of code should be
untouched as much as possible. If we want to do send DROPPED here (although I
think it would work), we need to release the quota and do further logic
changes. The reason is once we DROP the the task from instance, we will miss
currentState and we will not see that task in other methods in order to release
quota. Hence, I prefer to send DROPPED for the task using two level approach
(i.e. first send it to INIT and then DROP it).
Also since there is a pending INIT->RUNNING message existed, it is not very
safe to release the quota here. Because the task might go to RUNNING state and
before we cancel it, the quota information will be mismatched with the actual
scenario. So if we first let the task to go to INIT state and then releasing
the quota and sending it to DROPPED will be better option in my opinion.
To answer your question, we will actually handle this case for releasing the
quota in updatePreviousAssignedTasksStatus method. Here the code that would do
that:
```
case INIT: {
// INIT is a temporary state for tasks
// Two possible scenarios for INIT:
// 1. Task is getting scheduled for the first time. In this case,
Task's state will go
// from null->INIT->RUNNING, and this INIT state will be transient
and very short-lived
// 2. Task is getting scheduled for the first time, but in this
case, job is timed out or
// timing out. In this case, it will be sent back to INIT state to
be removed. Here we
// ensure that this task then goes from INIT to DROPPED so that it
will be released from
// AssignableInstance to prevent resource leak
if (jobState == TaskState.TIMED_OUT || jobState ==
TaskState.TIMING_OUT
|| jobTgtState == TargetState.DELETE) {
// Job is timed out or timing out or targetState is to be
deleted, so its tasks will be
// sent back to INIT
// In this case, tasks' IdealState will be removed, and they
will be sent to DROPPED
partitionsToDropFromIs.add(pId);
assignedPartitions.get(instance).add(pId);
paMap.put(pId, new PartitionAssignment(instance,
TaskPartitionState.DROPPED.name()));
// Also release resources for these tasks
assignableInstanceManager.release(instance, taskConfig,
quotaType);
break;
}
```
Your comment actually guided me to add another safety check that we can
added for INIT case. I added two lines to make sure we DROP the task if job is
TIMED_OUT or TIMING_OUT and the task is in INIT state. Thanks for the comments
and careful review.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]