AMBARI-18576. When multiple requests are running, aborting any will incorrectly 
abort all requests instead the desired one (alejandro)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/21314666
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/21314666
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/21314666

Branch: refs/heads/branch-feature-AMBARI-18456
Commit: 2131466610b41af761fd1a105312c6553685c5ac
Parents: 2709304
Author: Alejandro Fernandez <afernan...@hortonworks.com>
Authored: Wed Oct 12 15:59:29 2016 -0700
Committer: Alejandro Fernandez <afernan...@hortonworks.com>
Committed: Wed Oct 12 15:59:32 2016 -0700

----------------------------------------------------------------------
 .../ambari/server/actionmanager/ActionDBAccessor.java | 13 +++++++++++++
 .../server/actionmanager/ActionDBAccessorImpl.java    | 14 ++++++++++++++
 .../ambari/server/actionmanager/ActionScheduler.java  |  4 ++--
 .../org/apache/ambari/server/orm/dao/StageDAO.java    | 10 ++++++++++
 .../ambari/server/orm/entities/StageEntity.java       |  5 ++++-
 .../server/actionmanager/TestActionScheduler.java     | 12 ++++++------
 6 files changed, 49 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/21314666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
index 0e78cbc..8aef70d 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
@@ -86,6 +86,19 @@ public interface ActionDBAccessor {
   public List<Stage> getStagesInProgress();
 
   /**
+   * Returns all the pending stages in a request, including queued and 
not-queued. A stage is
+   * considered in progress if it is in progress for any host.
+   * <p/>
+   * The results will be sorted by stage ID making this call
+   * expensive in some scenarios. Use {@link #getCommandsInProgressCount()} in
+   * order to determine if there are stages that are in progress before getting
+   * the stages from this method.
+   *
+   * @see HostRoleStatus#IN_PROGRESS_STATUSES
+   */
+  public List<Stage> getStagesInProgressForRequest(Long requestId);
+
+  /**
    * Gets the number of commands in progress.
    *
    * @return the number of commands in progress.

http://git-wip-us.apache.org/repos/asf/ambari/blob/21314666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index c31ca7e..2c87583 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -270,10 +270,24 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
    */
   @Override
   @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
+  public List<Stage> getStagesInProgressForRequest(Long requestId) {
+    List<StageEntity> stageEntities = 
stageDAO.findByRequestIdAndCommandStatuses(requestId, 
HostRoleStatus.IN_PROGRESS_STATUSES);
+    return getStagesForEntities(stageEntities);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
   public List<Stage> getStagesInProgress() {
     List<StageEntity> stageEntities = stageDAO.findByCommandStatuses(
       HostRoleStatus.IN_PROGRESS_STATUSES);
+    return getStagesForEntities(stageEntities);
+  }
 
+  @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
+  private List<Stage> getStagesForEntities(List<StageEntity> stageEntities) {
     // experimentally enable parallel stage processing
     @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
     boolean useConcurrentStageProcessing = 
configuration.isExperimentalConcurrentStageProcessingEnabled();

http://git-wip-us.apache.org/repos/asf/ambari/blob/21314666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index e380ae4..8cbfb1e 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -1139,10 +1139,10 @@ class ActionScheduler implements Runnable {
           cancelHostRoleCommands(tasksToDequeue, reason);
         }
 
-        // abort any stages in progress; don't execute this for all stages 
since
+        // abort any stages in progress that belong to this request; don't 
execute this for all stages since
         // that could lead to OOM errors on large requests, like those for
         // upgrades
-        List<Stage> stagesInProgress = db.getStagesInProgress();
+        List<Stage> stagesInProgress = 
db.getStagesInProgressForRequest(requestId);
         for (Stage stageInProgress : stagesInProgress) {
           abortOperationsForStage(stageInProgress);
         }

http://git-wip-us.apache.org/repos/asf/ambari/blob/21314666/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
index 8ef4a1b..d2f899f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
@@ -164,6 +164,16 @@ public class StageDAO {
   }
 
   @RequiresSession
+  public List<StageEntity> findByRequestIdAndCommandStatuses(Long requestId, 
Collection<HostRoleStatus> statuses) {
+    TypedQuery<StageEntity> query = 
entityManagerProvider.get().createNamedQuery(
+        "StageEntity.findByRequestIdAndCommandStatuses", StageEntity.class);
+
+    query.setParameter("requestId", requestId);
+    query.setParameter("statuses", statuses);
+    return daoUtils.selectList(query);
+  }
+
+  @RequiresSession
   public List<StageEntity> findByCommandStatuses(
       Collection<HostRoleStatus> statuses) {
     TypedQuery<StageEntity> query = 
entityManagerProvider.get().createNamedQuery(

http://git-wip-us.apache.org/repos/asf/ambari/blob/21314666/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
index 7659a23..eaea913 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
@@ -42,7 +42,10 @@ import javax.persistence.Table;
 @NamedQueries({
     @NamedQuery(
         name = "StageEntity.findByCommandStatuses",
-        query = "SELECT stage from StageEntity stage WHERE EXISTS (SELECT 
roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE 
roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND 
roleCommand.requestId = stage.requestId ) ORDER by stage.requestId, 
stage.stageId"),
+        query = "SELECT stage from StageEntity stage WHERE stage.stageId IN 
(SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE 
roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND 
roleCommand.requestId = stage.requestId ) ORDER BY stage.requestId, 
stage.stageId"),
+    @NamedQuery(
+        name = "StageEntity.findByRequestIdAndCommandStatuses",
+        query = "SELECT stage from StageEntity stage WHERE stage.stageId IN 
(SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE 
roleCommand.requestId = :requestId AND roleCommand.status IN :statuses AND 
roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId 
) ORDER BY stage.stageId"),
     @NamedQuery(
         name = "StageEntity.findIdsByRequestId",
         query = "SELECT stage.stageId FROM StageEntity stage WHERE 
stage.requestId = :requestId ORDER BY stage.stageId ASC") })

http://git-wip-us.apache.org/repos/asf/ambari/blob/21314666/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index ccb95a6..f86c02e 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -2204,10 +2204,9 @@ public class TestActionScheduler {
     hosts.put(hostname, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
 
-    long requestId = 1;
-
-    // create 3 stages, each with a single task - the first stage will be 
completed and should not
+    // Create a single request with 3 stages, each with a single task - the 
first stage will be completed and should not
     // be included when cancelling the unfinished tasks of the request
+    long requestId = 1;
     final List<Stage> allStages = new ArrayList<Stage>();
     final List<Stage> stagesInProgress = new ArrayList<Stage>();
     final List<HostRoleCommand> tasksInProgress = new ArrayList<>();
@@ -2219,7 +2218,7 @@ public class TestActionScheduler {
 
     Stage stageWithTask = getStageWithSingleTask(
         hostname, "cluster1", Role.SECONDARY_NAMENODE, RoleCommand.START,
-        Service.Type.HDFS, secondaryNamenodeCmdTaskId, 1, (int)requestId);
+        Service.Type.HDFS, secondaryNamenodeCmdTaskId, 1, (int) requestId);
 
     // complete the first stage
     
stageWithTask.getOrderedHostRoleCommands().get(0).setStatus(HostRoleStatus.COMPLETED);
@@ -2227,7 +2226,7 @@ public class TestActionScheduler {
 
     stageWithTask = getStageWithSingleTask(
         hostname, "cluster1", Role.NAMENODE, RoleCommand.START,
-        Service.Type.HDFS, namenodeCmdTaskId, 2, (int)requestId);
+        Service.Type.HDFS, namenodeCmdTaskId, 2, (int) requestId);
 
     tasksInProgress.addAll(stageWithTask.getOrderedHostRoleCommands());
     stagesInProgress.add(stageWithTask);
@@ -2235,7 +2234,7 @@ public class TestActionScheduler {
 
     stageWithTask = getStageWithSingleTask(
         hostname, "cluster1", Role.DATANODE, RoleCommand.START,
-        Service.Type.HDFS, datanodeCmdTaskId, 3, (int)requestId);
+        Service.Type.HDFS, datanodeCmdTaskId, 3, (int) requestId);
 
     tasksInProgress.addAll(stageWithTask.getOrderedHostRoleCommands());
     stagesInProgress.add(stageWithTask);
@@ -2271,6 +2270,7 @@ public class TestActionScheduler {
 
     when(db.getCommandsInProgressCount()).thenReturn(stagesInProgress.size());
     when(db.getStagesInProgress()).thenReturn(stagesInProgress);
+    
when(db.getStagesInProgressForRequest(requestId)).thenReturn(stagesInProgress);
     when(db.getAllStages(anyLong())).thenReturn(allStages);
 
     List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();

Reply via email to