AMBARI-17374. Ambari reports "IN PROGRESS" status for a finished install task. 
(mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/21a54489
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/21a54489
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/21a54489

Branch: refs/heads/trunk
Commit: 21a544891e7388c807ad8b88eb80afada9e31b19
Parents: 7a43ef4
Author: Myroslav Papirkovskyi <mpapyrkovs...@hortonworks.com>
Authored: Wed Jun 22 19:13:02 2016 +0300
Committer: Myroslav Papirkovskyi <mpapyrkovs...@hortonworks.com>
Committed: Wed Jun 29 19:21:57 2016 +0300

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py |  9 +++-
 .../ambari_agent/CustomServiceOrchestrator.py   |  5 +-
 .../TestCustomServiceOrchestrator.py            |  2 +-
 .../server/actionmanager/ActionManager.java     |  7 +++
 .../server/actionmanager/ActionScheduler.java   | 13 +++++
 .../server/actionmanager/TestActionManager.java | 56 ++++++++++++++++++++
 .../actionmanager/TestActionScheduler.java      | 20 ++++---
 .../server/agent/HeartbeatProcessorTest.java    |  4 +-
 8 files changed, 104 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/21a54489/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py 
b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index f217a54..60c72af 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -26,6 +26,7 @@ import pprint
 import os
 import ambari_simplejson as json
 import time
+import signal
 
 from AgentException import AgentException
 from LiveStatus import LiveStatus
@@ -141,7 +142,7 @@ class ActionQueue(threading.Thread):
           logger.info("Canceling " + queued_command['commandType'] + \
                       " for service " + queued_command['serviceName'] + \
                       " and role " +  queued_command['role'] + \
-                      " with taskId " + queued_command['taskId'])
+                      " with taskId " + str(queued_command['taskId']))
 
       # Kill if in progress
       self.customServiceOrchestrator.cancel_command(task_id, reason)
@@ -313,7 +314,11 @@ class ActionQueue(threading.Thread):
         if commandresult['exitcode'] == 0:
           status = self.COMPLETED_STATUS
         else:
-          status = self.FAILED_STATUS
+          if (commandresult['exitcode'] == -signal.SIGTERM) or 
(commandresult['exitcode'] == -signal.SIGKILL):
+            logger.info('Command {cid} was canceled!'.format(cid=taskId))
+            return
+          else:
+            status = self.FAILED_STATUS
 
       if status != self.COMPLETED_STATUS and retryAble and retryDuration > 0:
         delay = self.get_retry_delay(delay)

http://git-wip-us.apache.org/repos/asf/ambari/blob/21a54489/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py 
b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index fc1b72a..57416a4 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -244,7 +244,10 @@ class CustomServiceOrchestrator():
         logger.debug('Pop with taskId %s' % task_id)
         pid = self.commands_in_progress.pop(task_id)
         if not isinstance(pid, int):
-          return '\nCommand aborted. ' + pid
+          if pid:
+            return '\nCommand aborted. ' + pid
+          else:
+            return ''
     return None
 
   def requestComponentStatus(self, command):

http://git-wip-us.apache.org/repos/asf/ambari/blob/21a54489/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py 
b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
index 0ff0ba5..c9724b7 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
@@ -440,7 +440,7 @@ class TestCustomServiceOrchestrator(TestCase):
 
     time.sleep(.1)
 
-    orchestrator.cancel_command(19,'')
+    orchestrator.cancel_command(19,'reason')
     self.assertTrue(kill_process_with_children_mock.called)
     kill_process_with_children_mock.assert_called_with(33)
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/21a54489/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
index 71364c2..2b121dc 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.actionmanager;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -141,6 +142,12 @@ public class ActionManager {
       return;
     }
 
+    Collections.sort(reports, new Comparator<CommandReport>() {
+      @Override
+      public int compare(CommandReport o1, CommandReport o2) {
+        return (int) (o1.getTaskId()-o2.getTaskId());
+      }
+    });
     List<CommandReport> reportsToProcess = new ArrayList<CommandReport>();
     Iterator<HostRoleCommand> commandIterator = commands.iterator();
     //persist the action response into the db.

http://git-wip-us.apache.org/repos/asf/ambari/blob/21a54489/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index b3aab9f..bf2ff38 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -722,6 +722,8 @@ class ActionScheduler implements Runnable {
             LOG.info("Removing command from queue, host={}, commandId={} ", 
host, c.getCommandId());
             actionQueue.dequeue(host, c.getCommandId());
           } else {
+            
cancelCommandOnTimeout(Collections.singletonList(s.getHostRoleCommand(host, 
roleStr)));
+
             LOG.info("Host: {}, role: {}, actionId: {} timed out and will be 
rescheduled", host,
                 roleStr, s.getActionId());
 
@@ -1072,6 +1074,17 @@ class ActionScheduler implements Runnable {
       }
     }
   }
+  void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands) {
+    for (HostRoleCommand hostRoleCommand : hostRoleCommands) {
+      if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED ||
+            hostRoleCommand.getStatus() == HostRoleStatus.IN_PROGRESS) {
+        CancelCommand cancelCommand = new CancelCommand();
+        cancelCommand.setTargetTaskId(hostRoleCommand.getTaskId());
+        cancelCommand.setReason("");
+        actionQueue.enqueue(hostRoleCommand.getHostName(), cancelCommand);
+      }
+    }
+  }
 
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/21a54489/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
index baee0d8..f85b95d 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
@@ -134,6 +134,45 @@ public class TestActionManager {
   }
 
   @Test
+  public void testActionResponsesUnsorted() throws AmbariException {
+    ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
+    ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(),
+        clusters, db, new HostsMap((String) null), unitOfWork,
+        injector.getInstance(RequestFactory.class), null, null);
+    populateActionDBWithTwoCommands(db, hostname);
+    Stage stage = db.getAllStages(requestId).get(0);
+    Assert.assertEquals(stageId, stage.getStageId());
+    stage.setHostRoleStatus(hostname, "HBASE_MASTER", HostRoleStatus.QUEUED);
+    db.hostRoleScheduled(stage, hostname, "HBASE_MASTER");
+    List<CommandReport> reports = new ArrayList<CommandReport>();
+    CommandReport cr = new CommandReport();
+    cr.setTaskId(2);
+    cr.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr.setRole("HBASE_REGIONSERVER");
+    cr.setStatus("COMPLETED");
+    cr.setStdErr("ERROR");
+    cr.setStdOut("OUTPUT");
+    cr.setStructuredOut("STRUCTURED_OUTPUT");
+    cr.setExitCode(215);
+    reports.add(cr);
+    CommandReport cr2 = new CommandReport();
+    cr2.setTaskId(1);
+    cr2.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr2.setRole("HBASE_MASTER");
+    cr2.setStatus("IN_PROGRESS");
+    cr2.setStdErr("ERROR");
+    cr2.setStdOut("OUTPUT");
+    cr2.setStructuredOut("STRUCTURED_OUTPUT");
+    cr2.setExitCode(215);
+    reports.add(cr2);
+    am.processTaskResponse(hostname, reports, am.getTasks(Arrays.asList(new 
Long[]{1L, 2L})));
+    assertEquals(HostRoleStatus.IN_PROGRESS, am.getAction(requestId, stageId)
+        .getHostRoleStatus(hostname, "HBASE_MASTER"));
+    assertEquals(HostRoleStatus.PENDING, am.getAction(requestId, stageId)
+        .getHostRoleStatus(hostname, "HBASE_REGIONSERVER"));
+  }
+
+  @Test
   public void testLargeLogs() throws AmbariException {
     ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
     ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(),
@@ -189,6 +228,23 @@ public class TestActionManager {
     db.persistActions(request);
   }
 
+  private void populateActionDBWithTwoCommands(ActionDBAccessor db, String 
hostname) throws AmbariException {
+    Stage s = stageFactory.createNew(requestId, "/a/b", "cluster1", 1L, 
"action manager test", "clusterHostInfo", "commandParamsStage", 
"hostParamsStage");
+    s.setStageId(stageId);
+    s.addHostRoleExecutionCommand(hostname, Role.HBASE_MASTER,
+        RoleCommand.START,
+        new ServiceComponentHostStartEvent(Role.HBASE_MASTER.toString(),
+          hostname, System.currentTimeMillis()), "cluster1", "HBASE", false, 
false);
+    s.addHostRoleExecutionCommand(hostname, Role.HBASE_REGIONSERVER,
+        RoleCommand.START,
+        new ServiceComponentHostStartEvent(Role.HBASE_REGIONSERVER.toString(),
+          hostname, System.currentTimeMillis()), "cluster1", "HBASE", false, 
false);
+    List<Stage> stages = new ArrayList<Stage>();
+    stages.add(s);
+    Request request = new Request(stages, clusters);
+    db.persistActions(request);
+  }
+
   // Test failing ... tracked by Jira BUG-4966
   @Ignore
   @Test

http://git-wip-us.apache.org/repos/asf/ambari/blob/21a54489/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index d92d87a..e0f67af 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -210,15 +210,18 @@ public class TestActionScheduler {
     scheduler.setTaskTimeoutAdjustment(false);
 
     List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler);
-    assertTrue(ac.get(0) instanceof ExecutionCommand);
-    assertEquals("1-977", ((ExecutionCommand) (ac.get(0))).getCommandId());
-    assertEquals(clusterHostInfo, ((ExecutionCommand) 
(ac.get(0))).getClusterHostInfo());
+    AgentCommand scheduledCommand = ac.get(0);
+    assertTrue(scheduledCommand instanceof ExecutionCommand);
+    assertEquals("1-977", ((ExecutionCommand) 
scheduledCommand).getCommandId());
+    assertEquals(clusterHostInfo, ((ExecutionCommand) 
scheduledCommand).getClusterHostInfo());
 
     //The action status has not changed, it should be queued again.
-    ac = waitForQueueSize(hostname, aq, 1, scheduler);
-    assertTrue(ac.get(0) instanceof ExecutionCommand);
-    assertEquals("1-977", ((ExecutionCommand) (ac.get(0))).getCommandId());
-    assertEquals(clusterHostInfo, ((ExecutionCommand) 
(ac.get(0))).getClusterHostInfo());
+    ac = waitForQueueSize(hostname, aq, 2, scheduler);
+    // first command is cancel for previous
+    scheduledCommand = ac.get(1);
+    assertTrue(scheduledCommand instanceof ExecutionCommand);
+    assertEquals("1-977", ((ExecutionCommand) 
scheduledCommand).getCommandId());
+    assertEquals(clusterHostInfo, ((ExecutionCommand) 
scheduledCommand).getClusterHostInfo());
 
     //Now change the action status
     s.setHostRoleStatus(hostname, "NAMENODE", HostRoleStatus.COMPLETED);
@@ -323,6 +326,9 @@ public class TestActionScheduler {
     //Check that in_progress command is rescheduled
     assertEquals(HostRoleStatus.QUEUED, 
stages.get(0).getHostRoleStatus(hostname, "SECONDARY_NAMENODE"));
 
+    // Check was generated cancel command on timeout
+    assertFalse(aq.dequeue(hostname, 
AgentCommandType.CANCEL_COMMAND).isEmpty());
+
     //Switch command back to IN_PROGRESS status and check that other command 
is not rescheduled
     stages.get(0).setHostRoleStatus(hostname, "SECONDARY_NAMENODE", 
HostRoleStatus.IN_PROGRESS);
     scheduler.doWork();

http://git-wip-us.apache.org/repos/asf/ambari/blob/21a54489/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
index bdbb9ab..913c4ea 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
@@ -1269,7 +1269,9 @@ public class HeartbeatProcessorTest {
     cmdReport.setRole("install_packages");
     cmdReport.setClusterName(DummyCluster);
 
-    hb.setReports(Collections.singletonList(cmdReport));
+    List<CommandReport> reports = new ArrayList<>();
+    reports.add(cmdReport);
+    hb.setReports(reports);
     hb.setTimestamp(0L);
     hb.setResponseId(0);
     hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, 
DummyHostStatus));

Reply via email to