alirezazamani commented on a change in pull request #1040:
URL: https://github.com/apache/helix/pull/1040#discussion_r433463896



##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -449,43 +448,31 @@ private void 
updatePartitionInformationInJobContext(CurrentStateOutput currentSt
    * @param paMap
    * @param assignedPartitions
    */
-  private void processTaskWithPendingMessage(ResourceAssignment 
prevAssignment, Integer pId,
-      String pName, String instance, Message pendingMessage, TaskState 
jobState,
-      TaskPartitionState currState, Map<Integer, PartitionAssignment> paMap,
-      Map<String, Set<Integer>> assignedPartitions) {
-
-    // stateMap is a mapping of Instance -> TaskPartitionState (String)
-    Map<String, String> stateMap = prevAssignment.getReplicaMap(new 
Partition(pName));
-    if (stateMap != null) {
-      String prevState = stateMap.get(instance);
-      if (!pendingMessage.getToState().equals(prevState)) {
-        LOG.warn(String.format(
-            "Task pending to-state is %s while previous assigned state is %s. 
This should not"
-                + "happen.",
-            pendingMessage.getToState(), prevState));
+  private void processTaskWithPendingMessage(Integer pId, String pName, String 
instance,
+      Message pendingMessage, TaskState jobState, TaskPartitionState currState,
+      Map<Integer, PartitionAssignment> paMap, Map<String, Set<Integer>> 
assignedPartitions) {
+
+    if (jobState == TaskState.TIMING_OUT && currState == 
TaskPartitionState.INIT
+        && 
pendingMessage.getToState().equals(TaskPartitionState.RUNNING.name())) {
+      // While job is timing out, if the task is pending on INIT->RUNNING, set 
it back to INIT,
+      // so that Helix will cancel the transition.
+      paMap.put(pId, new PartitionAssignment(instance, 
TaskPartitionState.INIT.name()));

Review comment:
       @narendly Thank you for the suggestion. I think what you are suggesting 
would also work. However, I think the logic behind this part of code should be 
untouched as much as possible. If we want to do send DROPPED here (although I 
think it would work), we need to release the quota and do further logic 
changes. The reason is once we DROP the the task from instance, we will miss 
currentState and we will not see that task in other methods in order to release 
quota. Hence, I prefer to send DROPPED for the task using two level approach 
(i.e. first send it to INIT and then DROP it).
   
   Also since there is a pending INIT->RUNNING message existed, it is not very 
safe to release the quota here. Because the task might go to RUNNING state and 
before we cancel it, the quota information will be mismatched with the actual 
scenario. So if we first let the task to go to INIT state and then releasing 
the quota and sending it to DROPPED will be better option in my opinion.
   
   To answer your question, we will actually handle this case for releasing the 
quota in updatePreviousAssignedTasksStatus method. Here the code that would do 
that:
   
   ```
           case INIT: {
             // INIT is a temporary state for tasks
             // Two possible scenarios for INIT:
             // 1. Task is getting scheduled for the first time. In this case, 
Task's state will go
             // from null->INIT->RUNNING, and this INIT state will be transient 
and very short-lived
             // 2. Task is getting scheduled for the first time, but in this 
case, job is timed out or
             // timing out. In this case, it will be sent back to INIT state to 
be removed. Here we
             // ensure that this task then goes from INIT to DROPPED so that it 
will be released from
             // AssignableInstance to prevent resource leak
             if (jobState == TaskState.TIMED_OUT || jobState == 
TaskState.TIMING_OUT
                 || jobTgtState == TargetState.DELETE) {
               // Job is timed out or timing out or targetState is to be 
deleted, so its tasks will be
               // sent back to INIT
               // In this case, tasks' IdealState will be removed, and they 
will be sent to DROPPED
               partitionsToDropFromIs.add(pId);
               
               assignedPartitions.get(instance).add(pId);
               paMap.put(pId, new PartitionAssignment(instance, 
TaskPartitionState.DROPPED.name()));
   
               // Also release resources for these tasks
               **assignableInstanceManager.release(instance, taskConfig, 
quotaType);**
               break;
             } 
   ```
   
   Your comment actually guided me to add another safety check that we can 
added for INIT case. I added two lines to make sure we DROP the task if job is 
TIMED_OUT or TIMING_OUT and the task is in INIT state. Thanks for the comments 
and careful review.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to