AMBARI-18576. When multiple requests are running, aborting any will incorrectly abort all requests instead the desired one (alejandro)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/21314666 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/21314666 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/21314666 Branch: refs/heads/branch-dev-patch-upgrade Commit: 2131466610b41af761fd1a105312c6553685c5ac Parents: 2709304 Author: Alejandro Fernandez <afernan...@hortonworks.com> Authored: Wed Oct 12 15:59:29 2016 -0700 Committer: Alejandro Fernandez <afernan...@hortonworks.com> Committed: Wed Oct 12 15:59:32 2016 -0700 ---------------------------------------------------------------------- .../ambari/server/actionmanager/ActionDBAccessor.java | 13 +++++++++++++ .../server/actionmanager/ActionDBAccessorImpl.java | 14 ++++++++++++++ .../ambari/server/actionmanager/ActionScheduler.java | 4 ++-- .../org/apache/ambari/server/orm/dao/StageDAO.java | 10 ++++++++++ .../ambari/server/orm/entities/StageEntity.java | 5 ++++- .../server/actionmanager/TestActionScheduler.java | 12 ++++++------ 6 files changed, 49 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/21314666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java index 0e78cbc..8aef70d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java @@ -86,6 +86,19 @@ public interface ActionDBAccessor { public List<Stage> getStagesInProgress(); /** + * Returns all the pending stages in a request, including queued and not-queued. A stage is + * considered in progress if it is in progress for any host. + * <p/> + * The results will be sorted by stage ID making this call + * expensive in some scenarios. Use {@link #getCommandsInProgressCount()} in + * order to determine if there are stages that are in progress before getting + * the stages from this method. + * + * @see HostRoleStatus#IN_PROGRESS_STATUSES + */ + public List<Stage> getStagesInProgressForRequest(Long requestId); + + /** * Gets the number of commands in progress. * * @return the number of commands in progress. http://git-wip-us.apache.org/repos/asf/ambari/blob/21314666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java index c31ca7e..2c87583 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java @@ -270,10 +270,24 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { */ @Override @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING) + public List<Stage> getStagesInProgressForRequest(Long requestId) { + List<StageEntity> stageEntities = stageDAO.findByRequestIdAndCommandStatuses(requestId, HostRoleStatus.IN_PROGRESS_STATUSES); + return getStagesForEntities(stageEntities); + } + + /** + * {@inheritDoc} + */ + @Override + @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING) public List<Stage> getStagesInProgress() { List<StageEntity> stageEntities = stageDAO.findByCommandStatuses( HostRoleStatus.IN_PROGRESS_STATUSES); + return getStagesForEntities(stageEntities); + } + @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING) + private List<Stage> getStagesForEntities(List<StageEntity> stageEntities) { // experimentally enable parallel stage processing @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING) boolean useConcurrentStageProcessing = configuration.isExperimentalConcurrentStageProcessingEnabled(); http://git-wip-us.apache.org/repos/asf/ambari/blob/21314666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java index e380ae4..8cbfb1e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java @@ -1139,10 +1139,10 @@ class ActionScheduler implements Runnable { cancelHostRoleCommands(tasksToDequeue, reason); } - // abort any stages in progress; don't execute this for all stages since + // abort any stages in progress that belong to this request; don't execute this for all stages since // that could lead to OOM errors on large requests, like those for // upgrades - List<Stage> stagesInProgress = db.getStagesInProgress(); + List<Stage> stagesInProgress = db.getStagesInProgressForRequest(requestId); for (Stage stageInProgress : stagesInProgress) { abortOperationsForStage(stageInProgress); } http://git-wip-us.apache.org/repos/asf/ambari/blob/21314666/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java index 8ef4a1b..d2f899f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java @@ -164,6 +164,16 @@ public class StageDAO { } @RequiresSession + public List<StageEntity> findByRequestIdAndCommandStatuses(Long requestId, Collection<HostRoleStatus> statuses) { + TypedQuery<StageEntity> query = entityManagerProvider.get().createNamedQuery( + "StageEntity.findByRequestIdAndCommandStatuses", StageEntity.class); + + query.setParameter("requestId", requestId); + query.setParameter("statuses", statuses); + return daoUtils.selectList(query); + } + + @RequiresSession public List<StageEntity> findByCommandStatuses( Collection<HostRoleStatus> statuses) { TypedQuery<StageEntity> query = entityManagerProvider.get().createNamedQuery( http://git-wip-us.apache.org/repos/asf/ambari/blob/21314666/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java index 7659a23..eaea913 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java @@ -42,7 +42,10 @@ import javax.persistence.Table; @NamedQueries({ @NamedQuery( name = "StageEntity.findByCommandStatuses", - query = "SELECT stage from StageEntity stage WHERE EXISTS (SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId ) ORDER by stage.requestId, stage.stageId"), + query = "SELECT stage from StageEntity stage WHERE stage.stageId IN (SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId ) ORDER BY stage.requestId, stage.stageId"), + @NamedQuery( + name = "StageEntity.findByRequestIdAndCommandStatuses", + query = "SELECT stage from StageEntity stage WHERE stage.stageId IN (SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE roleCommand.requestId = :requestId AND roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId ) ORDER BY stage.stageId"), @NamedQuery( name = "StageEntity.findIdsByRequestId", query = "SELECT stage.stageId FROM StageEntity stage WHERE stage.requestId = :requestId ORDER BY stage.stageId ASC") }) http://git-wip-us.apache.org/repos/asf/ambari/blob/21314666/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java index ccb95a6..f86c02e 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java @@ -2204,10 +2204,9 @@ public class TestActionScheduler { hosts.put(hostname, sch); when(scomp.getServiceComponentHosts()).thenReturn(hosts); - long requestId = 1; - - // create 3 stages, each with a single task - the first stage will be completed and should not + // Create a single request with 3 stages, each with a single task - the first stage will be completed and should not // be included when cancelling the unfinished tasks of the request + long requestId = 1; final List<Stage> allStages = new ArrayList<Stage>(); final List<Stage> stagesInProgress = new ArrayList<Stage>(); final List<HostRoleCommand> tasksInProgress = new ArrayList<>(); @@ -2219,7 +2218,7 @@ public class TestActionScheduler { Stage stageWithTask = getStageWithSingleTask( hostname, "cluster1", Role.SECONDARY_NAMENODE, RoleCommand.START, - Service.Type.HDFS, secondaryNamenodeCmdTaskId, 1, (int)requestId); + Service.Type.HDFS, secondaryNamenodeCmdTaskId, 1, (int) requestId); // complete the first stage stageWithTask.getOrderedHostRoleCommands().get(0).setStatus(HostRoleStatus.COMPLETED); @@ -2227,7 +2226,7 @@ public class TestActionScheduler { stageWithTask = getStageWithSingleTask( hostname, "cluster1", Role.NAMENODE, RoleCommand.START, - Service.Type.HDFS, namenodeCmdTaskId, 2, (int)requestId); + Service.Type.HDFS, namenodeCmdTaskId, 2, (int) requestId); tasksInProgress.addAll(stageWithTask.getOrderedHostRoleCommands()); stagesInProgress.add(stageWithTask); @@ -2235,7 +2234,7 @@ public class TestActionScheduler { stageWithTask = getStageWithSingleTask( hostname, "cluster1", Role.DATANODE, RoleCommand.START, - Service.Type.HDFS, datanodeCmdTaskId, 3, (int)requestId); + Service.Type.HDFS, datanodeCmdTaskId, 3, (int) requestId); tasksInProgress.addAll(stageWithTask.getOrderedHostRoleCommands()); stagesInProgress.add(stageWithTask); @@ -2271,6 +2270,7 @@ public class TestActionScheduler { when(db.getCommandsInProgressCount()).thenReturn(stagesInProgress.size()); when(db.getStagesInProgress()).thenReturn(stagesInProgress); + when(db.getStagesInProgressForRequest(requestId)).thenReturn(stagesInProgress); when(db.getAllStages(anyLong())).thenReturn(allStages); List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();