Repository: ambari Updated Branches: refs/heads/trunk af647f81c -> c774475b9
AMBARI-21864. DEPENDENCY_ORDERED stage execution hangs in case of circular dependencies between role commands (magyari_sandor) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c774475b Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c774475b Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c774475b Branch: refs/heads/trunk Commit: c774475b9281b4b04a1135e46e576896c3992e46 Parents: af647f8 Author: Sandor Magyari <smagy...@hortonworks.com> Authored: Fri Sep 1 17:12:20 2017 +0200 Committer: Sandor Magyari <smagy...@hortonworks.com> Committed: Tue Sep 5 14:10:07 2017 +0200 ---------------------------------------------------------------------- .../server/actionmanager/ActionScheduler.java | 41 +++++++- .../actionmanager/TestActionScheduler.java | 104 +++++++++++++++++++ 2 files changed, 141 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/c774475b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java index 9a45d1f..00e4184 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java @@ -218,7 +218,6 @@ class ActionScheduler implements Runnable { /** * Unit Test Constructor. - * * @param sleepTimeMilliSec * @param actionTimeoutMilliSec * @param db @@ -229,14 +228,17 @@ class ActionScheduler implements Runnable { * @param unitOfWork * @param ambariEventPublisher * @param configuration + * @param entityManagerProvider * @param hostRoleCommandDAO * @param hostRoleCommandFactory + * @param roleCommandOrderProvider */ protected ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec, ActionDBAccessor db, ActionQueue actionQueue, Clusters fsmObject, int maxAttempts, HostsMap hostsMap, UnitOfWork unitOfWork, AmbariEventPublisher ambariEventPublisher, Configuration configuration, Provider<EntityManager> entityManagerProvider, - HostRoleCommandDAO hostRoleCommandDAO, HostRoleCommandFactory hostRoleCommandFactory) { + HostRoleCommandDAO hostRoleCommandDAO, HostRoleCommandFactory hostRoleCommandFactory, + RoleCommandOrderProvider roleCommandOrderProvider) { sleepTime = sleepTimeMilliSec; actionTimeout = actionTimeoutMilliSec; @@ -252,12 +254,40 @@ class ActionScheduler implements Runnable { this.hostRoleCommandDAO = hostRoleCommandDAO; this.hostRoleCommandFactory = hostRoleCommandFactory; jpaPublisher = null; + this.roleCommandOrderProvider = roleCommandOrderProvider; serverActionExecutor = new ServerActionExecutor(db, sleepTime); initializeCaches(); } /** + * Unit Test Constructor. + * + * @param sleepTimeMilliSec + * @param actionTimeoutMilliSec + * @param db + * @param actionQueue + * @param fsmObject + * @param maxAttempts + * @param hostsMap + * @param unitOfWork + * @param ambariEventPublisher + * @param configuration + * @param hostRoleCommandDAO + * @param hostRoleCommandFactory + */ + protected ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec, ActionDBAccessor db, + ActionQueue actionQueue, Clusters fsmObject, int maxAttempts, HostsMap hostsMap, + UnitOfWork unitOfWork, AmbariEventPublisher ambariEventPublisher, + Configuration configuration, Provider<EntityManager> entityManagerProvider, + HostRoleCommandDAO hostRoleCommandDAO, HostRoleCommandFactory hostRoleCommandFactory) { + + this(sleepTimeMilliSec, actionTimeoutMilliSec, db, actionQueue, fsmObject, maxAttempts, hostsMap, unitOfWork, + ambariEventPublisher, configuration, entityManagerProvider, hostRoleCommandDAO, hostRoleCommandFactory, + null); + } + + /** * Initializes the caches. */ private void initializeCaches() { @@ -888,8 +918,11 @@ class ActionScheduler implements Runnable { boolean areCommandDependenciesFinished = true; RoleCommandOrder rco = roleCommandOrderProvider.getRoleCommandOrder(stage.getClusterId()); if (rco != null) { - Set<RoleCommandPair> roleCommandDependencies = rco.getDependencies().get(new - RoleCommandPair(Role.valueOf(command.getRole()), command.getRoleCommand())); + RoleCommandPair roleCommand = new + RoleCommandPair(Role.valueOf(command.getRole()), command.getRoleCommand()); + Set<RoleCommandPair> roleCommandDependencies = rco.getDependencies().get(roleCommand); + // remove eventual references to the same RoleCommand + roleCommandDependencies.remove(roleCommand); // check if there are any dependencies IN_PROGRESS if (roleCommandDependencies != null && CollectionUtils.containsAny(rolesCommandsInProgress, roleCommandDependencies)) { http://git-wip-us.apache.org/repos/asf/ambari/blob/c774475b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java index 34ee27a..de622a7 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java @@ -48,6 +48,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -74,6 +75,9 @@ import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.HostsMap; import org.apache.ambari.server.events.AmbariEvent; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; +import org.apache.ambari.server.metadata.RoleCommandOrder; +import org.apache.ambari.server.metadata.RoleCommandOrderProvider; +import org.apache.ambari.server.metadata.RoleCommandPair; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; import org.apache.ambari.server.orm.dao.HostDAO; @@ -253,6 +257,106 @@ public class TestActionScheduler { EasyMock.verify(entityManagerProviderMock); } + + /** + * This test sends a new action to the action scheduler and verifies that the action + * shows up in the action queue in case of DEPENDENCY_ORDERED execution type, with RoleCommand + * having dependencies on himself. + */ + @Test + public void testActionScheduleWithDependencyOrderedCommandExecution() throws Exception { + Type type = new TypeToken<Map<String, Set<String>>>() {}.getType(); + Map<String, List<String>> clusterHostInfo = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type); + + ActionQueue aq = new ActionQueue(); + Properties properties = new Properties(); + properties.setProperty("server.stage.command.execution_type", "DEPENDENCY_ORDERED"); + Configuration conf = new Configuration(properties); + Clusters fsm = mock(Clusters.class); + Cluster oneClusterMock = mock(Cluster.class); + Service serviceObj = mock(Service.class); + ServiceComponent scomp = mock(ServiceComponent.class); + ServiceComponentHost sch = mock(ServiceComponentHost.class); + UnitOfWork unitOfWork = mock(UnitOfWork.class); + RoleCommandOrderProvider rcoProvider = mock(RoleCommandOrderProvider.class); + RoleCommandOrder rco = mock(RoleCommandOrder.class); + when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); + when(fsm.getClusterById(anyLong())).thenReturn(oneClusterMock); + when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); + when(oneClusterMock.getClusterId()).thenReturn(Long.valueOf(1L)); + when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); + when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); + when(serviceObj.getCluster()).thenReturn(oneClusterMock); + + when(rcoProvider.getRoleCommandOrder(1L)).thenReturn(rco); + Map<RoleCommandPair, Set<RoleCommandPair>> roleCommandDependencies = new HashMap(); + RoleCommandPair roleCommand = new + RoleCommandPair(Role.valueOf("NAMENODE"), RoleCommand.INSTALL); + Set<RoleCommandPair> namenodeInstallDependencies = new HashSet<>(); + namenodeInstallDependencies.add(roleCommand); + roleCommandDependencies.put(roleCommand, namenodeInstallDependencies); + when(rco.getDependencies()).thenReturn(roleCommandDependencies); + + Host host = mock(Host.class); + HashMap<String, ServiceComponentHost> hosts = + new HashMap<>(); + hosts.put(hostname, sch); + HostEntity hostEntity = new HostEntity(); + hostEntity.setHostName(hostname); + hostDAO.merge(hostEntity); + when(scomp.getServiceComponentHosts()).thenReturn(hosts); + + when(fsm.getHost(anyString())).thenReturn(host); + when(host.getState()).thenReturn(HostState.HEALTHY); + when(host.getHostName()).thenReturn(hostname); + + ActionDBAccessor db = mock(ActionDBAccessorImpl.class); + HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class); + Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class)); + Stage s = StageUtils.getATestStage(1, 977, hostname, + "{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}"); + s.setCommandExecutionType(CommandExecutionType.DEPENDENCY_ORDERED); + List<Stage> stages = Collections.singletonList(s); + when(db.getCommandsInProgressCount()).thenReturn(stages.size()); + when(db.getFirstStageInProgressPerRequest()).thenReturn(stages); + + RequestEntity request = mock(RequestEntity.class); + when(request.isExclusive()).thenReturn(false); + when(request.getClusterHostInfo()).thenReturn(CLUSTER_HOST_INFO); + when(db.getRequestEntity(anyLong())).thenReturn(request); + + + //Keep large number of attempts so that the task is not expired finally + //Small action timeout to test rescheduling + ActionScheduler scheduler = new ActionScheduler(100, 5, db, aq, fsm, + 10000, new HostsMap((String) null), unitOfWork, null, conf, + entityManagerProviderMock, hostRoleCommandDAOMock, null, rcoProvider); + scheduler.setTaskTimeoutAdjustment(false); + + List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler); + AgentCommand scheduledCommand = ac.get(0); + assertTrue(scheduledCommand instanceof ExecutionCommand); + assertEquals("1-977", ((ExecutionCommand) scheduledCommand).getCommandId()); + assertEquals(clusterHostInfo, ((ExecutionCommand) scheduledCommand).getClusterHostInfo()); + + //The action status has not changed, it should be queued again. + ac = waitForQueueSize(hostname, aq, 2, scheduler); + // first command is cancel for previous + scheduledCommand = ac.get(1); + assertTrue(scheduledCommand instanceof ExecutionCommand); + assertEquals("1-977", ((ExecutionCommand) scheduledCommand).getCommandId()); + assertEquals(clusterHostInfo, ((ExecutionCommand) scheduledCommand).getClusterHostInfo()); + + //Now change the action status + s.setHostRoleStatus(hostname, "NAMENODE", HostRoleStatus.COMPLETED); + ac = aq.dequeueAll(hostname); + + //Wait for sometime, it shouldn't be scheduled this time. + ac = waitForQueueSize(hostname, aq, 0, scheduler); + + EasyMock.verify(entityManagerProviderMock); + } + private List<AgentCommand> waitForQueueSize(String hostname, ActionQueue aq, int expectedQueueSize, ActionScheduler scheduler) { int cycleCount = 0;