This is an automated email from the ASF dual-hosted git repository.

mpapirkovskyy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d497004  AMBARI-24521. CLONE - Requests STOMP topic sent updates for 
host check request. (#2143)
d497004 is described below

commit d497004d39639102d4092509ef8d955840bf3ecc
Author: Myroslav Papirkovskyi <mpapirkovs...@apache.org>
AuthorDate: Wed Aug 22 14:23:37 2018 +0300

    AMBARI-24521. CLONE - Requests STOMP topic sent updates for host check 
request. (#2143)
    
    * AMBARI-24521. CLONE - Requests STOMP topic sent updates for host check 
request. (mpapirkovskyy)
    
    * AMBARI-24521. CLONE - Requests STOMP topic sent updates for host check 
request. (mpapirkovskyy)
---
 .../server/actionmanager/ActionDBAccessorImpl.java | 14 ++++++-
 .../events/listeners/tasks/TaskStatusListener.java | 46 ++++++++++++++++------
 2 files changed, 47 insertions(+), 13 deletions(-)

diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 3543486..1a055b3 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -436,8 +436,18 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
     TaskCreateEvent taskCreateEvent = new TaskCreateEvent(hostRoleCommands);
     taskEventPublisher.publish(taskCreateEvent);
     List<HostRoleCommandEntity> hostRoleCommandEntities = 
hostRoleCommandDAO.findByRequest(requestEntity.getRequestId());
-    STOMPUpdatePublisher.publish(new RequestUpdateEvent(requestEntity,
-        hostRoleCommandDAO, topologyManager, clusterName, 
hostRoleCommandEntities));
+
+    // "requests" STOMP topic is used for clusters related requests only.
+    // Requests without clusters (like host checks) should be posted to 
divided topic.
+    if (clusterName != null) {
+      STOMPUpdatePublisher.publish(new RequestUpdateEvent(requestEntity,
+          hostRoleCommandDAO, topologyManager, clusterName, 
hostRoleCommandEntities));
+    } else {
+      LOG.debug("No STOMP request update event was fired for new request due 
no cluster related, " +
+              "request id: {}, command name: {}",
+          requestEntity.getRequestId(),
+          requestEntity.getCommandName());
+    }
   }
 
   @Override
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
index 0570fdf..b188729 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.ambari.server.ClusterNotFoundException;
 import org.apache.ambari.server.EagerSingleton;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
@@ -123,7 +122,7 @@ public class TaskStatusListener {
    * @param event Consumes {@link TaskUpdateEvent}.
    */
   @Subscribe
-  public void onTaskUpdateEvent(TaskUpdateEvent event) throws 
ClusterNotFoundException {
+  public void onTaskUpdateEvent(TaskUpdateEvent event) {
     LOG.debug("Received task update event {}", event);
     List<HostRoleCommand> hostRoleCommandListAll = event.getHostRoleCommands();
     List<HostRoleCommand>  hostRoleCommandWithReceivedStatus =  new 
ArrayList<>();
@@ -145,13 +144,27 @@ public class TaskStatusListener {
         requestIdsWithReceivedTaskStatus.add(hostRoleCommand.getRequestId());
 
         if 
(!activeTasksMap.get(reportedTaskId).getStatus().equals(hostRoleCommand.getStatus()))
 {
-          Set<RequestUpdateEvent.HostRoleCommand> hostRoleCommands = new 
HashSet<>();
-          hostRoleCommands.add(new 
RequestUpdateEvent.HostRoleCommand(hostRoleCommand.getTaskId(),
-              hostRoleCommand.getRequestId(),
-              hostRoleCommand.getStatus(),
-              hostRoleCommand.getHostName()));
-          requestsToPublish.add(new 
RequestUpdateEvent(hostRoleCommand.getRequestId(),
-              
activeRequestMap.get(hostRoleCommand.getRequestId()).getStatus(), 
hostRoleCommands));
+          // Ignore requests not related to any cluster. "requests" topic is 
used for cluster requests only.
+          Long clusterId = 
activeRequestMap.get(hostRoleCommand.getRequestId()).getClusterId();
+          if (clusterId != null && clusterId != -1) {
+            Set<RequestUpdateEvent.HostRoleCommand> hostRoleCommands = new 
HashSet<>();
+            hostRoleCommands.add(new 
RequestUpdateEvent.HostRoleCommand(hostRoleCommand.getTaskId(),
+                hostRoleCommand.getRequestId(),
+                hostRoleCommand.getStatus(),
+                hostRoleCommand.getHostName()));
+            requestsToPublish.add(new 
RequestUpdateEvent(hostRoleCommand.getRequestId(),
+                
activeRequestMap.get(hostRoleCommand.getRequestId()).getStatus(), 
hostRoleCommands));
+          } else {
+            LOG.debug("No STOMP request update event was fired for host 
component status change due no cluster related, " +
+                    "request id: {}, role: {}, role command: {}, host: {}, 
task id: {}, old state: {}, new state: {}",
+                hostRoleCommand.getRequestId(),
+                hostRoleCommand.getRole(),
+                hostRoleCommand.getRoleCommand(),
+                hostRoleCommand.getHostName(),
+                hostRoleCommand.getTaskId(),
+                activeTasksMap.get(reportedTaskId).getStatus(),
+                hostRoleCommand.getStatus());
+          }
         }
       }
     }
@@ -264,7 +277,8 @@ public class TaskStatusListener {
       // Request entity of the hostrolecommand should be persisted before 
publishing task create event
       assert requestEntity != null;
       Set<StageEntityPK> stageEntityPKs =  Sets.newHashSet(stageEntityPK);
-      ActiveRequest request = new 
ActiveRequest(requestEntity.getStatus(),requestEntity.getDisplayStatus(), 
stageEntityPKs);
+      ActiveRequest request = new 
ActiveRequest(requestEntity.getStatus(),requestEntity.getDisplayStatus(),
+          stageEntityPKs, requestEntity.getClusterId());
       activeRequestMap.put(requestId, request);
     }
   }
@@ -524,11 +538,14 @@ public class TaskStatusListener {
     private HostRoleStatus status;
     private HostRoleStatus displayStatus;
     private Set <StageEntityPK> stageEntityPks;
+    private Long clusterId;
 
-    public ActiveRequest(HostRoleStatus status, HostRoleStatus displayStatus, 
Set<StageEntityPK> stageEntityPks) {
+    public ActiveRequest(HostRoleStatus status, HostRoleStatus displayStatus, 
Set<StageEntityPK> stageEntityPks,
+                         Long clusterId) {
       this.status = status;
       this.displayStatus = displayStatus;
       this.stageEntityPks = stageEntityPks;
+      this.clusterId = clusterId;
     }
 
     public HostRoleStatus getStatus() {
@@ -559,6 +576,13 @@ public class TaskStatusListener {
       stageEntityPks.add(stageEntityPK);
     }
 
+    public Long getClusterId() {
+      return clusterId;
+    }
+
+    public void setClusterId(Long clusterId) {
+      this.clusterId = clusterId;
+    }
   }
 
   /**

Reply via email to