Repository: ambari
Updated Branches:
  refs/heads/trunk af647f81c -> c774475b9


AMBARI-21864. DEPENDENCY_ORDERED stage execution hangs in case of circular 
dependencies between role commands (magyari_sandor)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c774475b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c774475b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c774475b

Branch: refs/heads/trunk
Commit: c774475b9281b4b04a1135e46e576896c3992e46
Parents: af647f8
Author: Sandor Magyari <smagy...@hortonworks.com>
Authored: Fri Sep 1 17:12:20 2017 +0200
Committer: Sandor Magyari <smagy...@hortonworks.com>
Committed: Tue Sep 5 14:10:07 2017 +0200

----------------------------------------------------------------------
 .../server/actionmanager/ActionScheduler.java   |  41 +++++++-
 .../actionmanager/TestActionScheduler.java      | 104 +++++++++++++++++++
 2 files changed, 141 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/c774475b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 9a45d1f..00e4184 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -218,7 +218,6 @@ class ActionScheduler implements Runnable {
 
   /**
    * Unit Test Constructor.
-   *
    * @param sleepTimeMilliSec
    * @param actionTimeoutMilliSec
    * @param db
@@ -229,14 +228,17 @@ class ActionScheduler implements Runnable {
    * @param unitOfWork
    * @param ambariEventPublisher
    * @param configuration
+   * @param entityManagerProvider
    * @param hostRoleCommandDAO
    * @param hostRoleCommandFactory
+   * @param roleCommandOrderProvider
    */
   protected ActionScheduler(long sleepTimeMilliSec, long 
actionTimeoutMilliSec, ActionDBAccessor db,
                             ActionQueue actionQueue, Clusters fsmObject, int 
maxAttempts, HostsMap hostsMap,
                             UnitOfWork unitOfWork, AmbariEventPublisher 
ambariEventPublisher,
                             Configuration configuration, 
Provider<EntityManager> entityManagerProvider,
-                            HostRoleCommandDAO hostRoleCommandDAO, 
HostRoleCommandFactory hostRoleCommandFactory) {
+                            HostRoleCommandDAO hostRoleCommandDAO, 
HostRoleCommandFactory hostRoleCommandFactory,
+                            RoleCommandOrderProvider roleCommandOrderProvider) 
{
 
     sleepTime = sleepTimeMilliSec;
     actionTimeout = actionTimeoutMilliSec;
@@ -252,12 +254,40 @@ class ActionScheduler implements Runnable {
     this.hostRoleCommandDAO = hostRoleCommandDAO;
     this.hostRoleCommandFactory = hostRoleCommandFactory;
     jpaPublisher = null;
+    this.roleCommandOrderProvider = roleCommandOrderProvider;
 
     serverActionExecutor = new ServerActionExecutor(db, sleepTime);
     initializeCaches();
   }
 
   /**
+   * Unit Test Constructor.
+   *
+   * @param sleepTimeMilliSec
+   * @param actionTimeoutMilliSec
+   * @param db
+   * @param actionQueue
+   * @param fsmObject
+   * @param maxAttempts
+   * @param hostsMap
+   * @param unitOfWork
+   * @param ambariEventPublisher
+   * @param configuration
+   * @param hostRoleCommandDAO
+   * @param hostRoleCommandFactory
+   */
+  protected ActionScheduler(long sleepTimeMilliSec, long 
actionTimeoutMilliSec, ActionDBAccessor db,
+                            ActionQueue actionQueue, Clusters fsmObject, int 
maxAttempts, HostsMap hostsMap,
+                            UnitOfWork unitOfWork, AmbariEventPublisher 
ambariEventPublisher,
+                            Configuration configuration, 
Provider<EntityManager> entityManagerProvider,
+                            HostRoleCommandDAO hostRoleCommandDAO, 
HostRoleCommandFactory hostRoleCommandFactory) {
+
+    this(sleepTimeMilliSec, actionTimeoutMilliSec, db, actionQueue, fsmObject, 
maxAttempts, hostsMap, unitOfWork,
+            ambariEventPublisher, configuration, entityManagerProvider, 
hostRoleCommandDAO, hostRoleCommandFactory,
+            null);
+  }
+
+  /**
    * Initializes the caches.
    */
   private void initializeCaches() {
@@ -888,8 +918,11 @@ class ActionScheduler implements Runnable {
     boolean areCommandDependenciesFinished = true;
     RoleCommandOrder rco = 
roleCommandOrderProvider.getRoleCommandOrder(stage.getClusterId());
     if (rco != null) {
-      Set<RoleCommandPair> roleCommandDependencies = 
rco.getDependencies().get(new
-        RoleCommandPair(Role.valueOf(command.getRole()), 
command.getRoleCommand()));
+      RoleCommandPair roleCommand = new
+              RoleCommandPair(Role.valueOf(command.getRole()), 
command.getRoleCommand());
+      Set<RoleCommandPair> roleCommandDependencies = 
rco.getDependencies().get(roleCommand);
+      // remove eventual references to the same RoleCommand
+      roleCommandDependencies.remove(roleCommand);
 
       // check if there are any dependencies IN_PROGRESS
       if (roleCommandDependencies != null && 
CollectionUtils.containsAny(rolesCommandsInProgress, roleCommandDependencies)) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/c774475b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index 34ee27a..de622a7 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -48,6 +48,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -74,6 +75,9 @@ import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.HostsMap;
 import org.apache.ambari.server.events.AmbariEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.metadata.RoleCommandOrder;
+import org.apache.ambari.server.metadata.RoleCommandOrderProvider;
+import org.apache.ambari.server.metadata.RoleCommandPair;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.dao.HostDAO;
@@ -253,6 +257,106 @@ public class TestActionScheduler {
     EasyMock.verify(entityManagerProviderMock);
   }
 
+
+  /**
+   * This test sends a new action to the action scheduler and verifies that 
the action
+   * shows up in the action queue in case of DEPENDENCY_ORDERED execution 
type, with RoleCommand
+   * having dependencies on himself.
+   */
+  @Test
+  public void testActionScheduleWithDependencyOrderedCommandExecution() throws 
Exception {
+    Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
+    Map<String, List<String>> clusterHostInfo = 
StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type);
+
+    ActionQueue aq = new ActionQueue();
+    Properties properties = new Properties();
+    properties.setProperty("server.stage.command.execution_type", 
"DEPENDENCY_ORDERED");
+    Configuration conf = new Configuration(properties);
+    Clusters fsm = mock(Clusters.class);
+    Cluster oneClusterMock = mock(Cluster.class);
+    Service serviceObj = mock(Service.class);
+    ServiceComponent scomp = mock(ServiceComponent.class);
+    ServiceComponentHost sch = mock(ServiceComponentHost.class);
+    UnitOfWork unitOfWork = mock(UnitOfWork.class);
+    RoleCommandOrderProvider rcoProvider = 
mock(RoleCommandOrderProvider.class);
+    RoleCommandOrder rco = mock(RoleCommandOrder.class);
+    when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+    when(fsm.getClusterById(anyLong())).thenReturn(oneClusterMock);
+    when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+    when(oneClusterMock.getClusterId()).thenReturn(Long.valueOf(1L));
+    when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+    when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+    when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+    when(rcoProvider.getRoleCommandOrder(1L)).thenReturn(rco);
+    Map<RoleCommandPair, Set<RoleCommandPair>> roleCommandDependencies = new 
HashMap();
+    RoleCommandPair roleCommand = new
+            RoleCommandPair(Role.valueOf("NAMENODE"), RoleCommand.INSTALL);
+    Set<RoleCommandPair> namenodeInstallDependencies = new HashSet<>();
+    namenodeInstallDependencies.add(roleCommand);
+    roleCommandDependencies.put(roleCommand, namenodeInstallDependencies);
+    when(rco.getDependencies()).thenReturn(roleCommandDependencies);
+
+    Host host = mock(Host.class);
+    HashMap<String, ServiceComponentHost> hosts =
+            new HashMap<>();
+    hosts.put(hostname, sch);
+    HostEntity hostEntity = new HostEntity();
+    hostEntity.setHostName(hostname);
+    hostDAO.merge(hostEntity);
+    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
+    when(fsm.getHost(anyString())).thenReturn(host);
+    when(host.getState()).thenReturn(HostState.HEALTHY);
+    when(host.getHostName()).thenReturn(hostname);
+
+    ActionDBAccessor db = mock(ActionDBAccessorImpl.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+    
Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
+    Stage s = StageUtils.getATestStage(1, 977, hostname,
+            "{\"host_param\":\"param_value\"}", 
"{\"stage_param\":\"param_value\"}");
+    s.setCommandExecutionType(CommandExecutionType.DEPENDENCY_ORDERED);
+    List<Stage> stages = Collections.singletonList(s);
+    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
+    when(db.getFirstStageInProgressPerRequest()).thenReturn(stages);
+
+    RequestEntity request = mock(RequestEntity.class);
+    when(request.isExclusive()).thenReturn(false);
+    when(request.getClusterHostInfo()).thenReturn(CLUSTER_HOST_INFO);
+    when(db.getRequestEntity(anyLong())).thenReturn(request);
+
+
+    //Keep large number of attempts so that the task is not expired finally
+    //Small action timeout to test rescheduling
+    ActionScheduler scheduler = new ActionScheduler(100, 5, db, aq, fsm,
+            10000, new HostsMap((String) null), unitOfWork, null, conf,
+            entityManagerProviderMock, hostRoleCommandDAOMock, null, 
rcoProvider);
+    scheduler.setTaskTimeoutAdjustment(false);
+
+    List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler);
+    AgentCommand scheduledCommand = ac.get(0);
+    assertTrue(scheduledCommand instanceof ExecutionCommand);
+    assertEquals("1-977", ((ExecutionCommand) 
scheduledCommand).getCommandId());
+    assertEquals(clusterHostInfo, ((ExecutionCommand) 
scheduledCommand).getClusterHostInfo());
+
+    //The action status has not changed, it should be queued again.
+    ac = waitForQueueSize(hostname, aq, 2, scheduler);
+    // first command is cancel for previous
+    scheduledCommand = ac.get(1);
+    assertTrue(scheduledCommand instanceof ExecutionCommand);
+    assertEquals("1-977", ((ExecutionCommand) 
scheduledCommand).getCommandId());
+    assertEquals(clusterHostInfo, ((ExecutionCommand) 
scheduledCommand).getClusterHostInfo());
+
+    //Now change the action status
+    s.setHostRoleStatus(hostname, "NAMENODE", HostRoleStatus.COMPLETED);
+    ac = aq.dequeueAll(hostname);
+
+    //Wait for sometime, it shouldn't be scheduled this time.
+    ac = waitForQueueSize(hostname, aq, 0, scheduler);
+
+    EasyMock.verify(entityManagerProviderMock);
+  }
+
   private List<AgentCommand> waitForQueueSize(String hostname, ActionQueue aq,
       int expectedQueueSize, ActionScheduler scheduler) {
     int cycleCount = 0;

Reply via email to