This is an automated email from the ASF dual-hosted git repository. mpapirkovskyy pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push: new e4bc80a AMBARI-23898. Server posts numerous redundant metadata updates to agent. (#1325) e4bc80a is described below commit e4bc80afe44ba067a460acd32e02728244607b14 Author: Myroslav Papirkovskyi <mpapirkovs...@apache.org> AuthorDate: Fri May 18 21:20:54 2018 +0300 AMBARI-23898. Server posts numerous redundant metadata updates to agent. (#1325) * AMBARI-23898. Server posts numerous redundant metadata updates to agent. (mpapirkovskyy) * AMBARI-23898. Server posts numerous redundant metadata updates to agent. (mpapirkovskyy) --- .../apache/ambari/server/agent/RecoveryConfig.java | 28 +++++++++++++++++ .../server/agent/stomp/HostLevelParamsHolder.java | 35 ++++++++++++++++++++-- .../ambari/server/agent/stomp/MetadataHolder.java | 18 ++++------- .../server/agent/stomp/dto/HostRepositories.java | 16 +++++----- .../server/agent/stomp/dto/MetadataCluster.java | 29 ++++++++++++++++++ .../ambari/server/configuration/Configuration.java | 4 +-- .../controller/AmbariManagementControllerImpl.java | 31 +++---------------- .../server/events/HostLevelParamsUpdateEvent.java | 4 +++ 8 files changed, 112 insertions(+), 53 deletions(-) diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java index 8f909cd..8e2078d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java @@ -105,6 +105,34 @@ public class RecoveryConfig { } @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + RecoveryConfig that = (RecoveryConfig) o; + + if (type != null ? !type.equals(that.type) : that.type != null) return false; + if (maxCount != null ? !maxCount.equals(that.maxCount) : that.maxCount != null) return false; + if (windowInMinutes != null ? !windowInMinutes.equals(that.windowInMinutes) : that.windowInMinutes != null) + return false; + if (retryGap != null ? !retryGap.equals(that.retryGap) : that.retryGap != null) return false; + if (maxLifetimeCount != null ? !maxLifetimeCount.equals(that.maxLifetimeCount) : that.maxLifetimeCount != null) + return false; + return enabledComponents != null ? enabledComponents.equals(that.enabledComponents) : that.enabledComponents == null; + } + + @Override + public int hashCode() { + int result = type != null ? type.hashCode() : 0; + result = 31 * result + (maxCount != null ? maxCount.hashCode() : 0); + result = 31 * result + (windowInMinutes != null ? windowInMinutes.hashCode() : 0); + result = 31 * result + (retryGap != null ? retryGap.hashCode() : 0); + result = 31 * result + (maxLifetimeCount != null ? maxLifetimeCount.hashCode() : 0); + result = 31 * result + (enabledComponents != null ? enabledComponents.hashCode() : 0); + return result; + } + + @Override public String toString() { StringBuilder buffer = new StringBuilder("RecoveryConfig{"); buffer.append(", type=").append(type); diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java index abd78c8..fbd26dd 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java @@ -18,6 +18,7 @@ package org.apache.ambari.server.agent.stomp; import java.util.Collection; +import java.util.Map; import java.util.TreeMap; import org.apache.ambari.server.AmbariException; @@ -30,6 +31,7 @@ import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Host; +import org.apache.commons.collections.MapUtils; import com.google.common.eventbus.Subscribe; import com.google.inject.Inject; @@ -78,9 +80,36 @@ public class HostLevelParamsHolder extends AgentHostDataHolder<HostLevelParamsUp } protected boolean handleUpdate(HostLevelParamsUpdateEvent update) throws AmbariException { - //TODO implement update host level params process - setData(update, update.getHostId()); - return true; + boolean changed = false; + if (MapUtils.isNotEmpty(update.getHostLevelParamsClusters())) { + Long hostId = update.getHostId(); + for (Map.Entry<String, HostLevelParamsCluster> hostLevelParamsClusterEntry : update.getHostLevelParamsClusters().entrySet()) { + HostLevelParamsCluster updatedCluster = hostLevelParamsClusterEntry.getValue(); + String clusterId = hostLevelParamsClusterEntry.getKey(); + Map<String, HostLevelParamsCluster> clusters = getData().get(hostId).getHostLevelParamsClusters(); + if (clusters.containsKey(clusterId)) { + HostLevelParamsCluster cluster = clusters.get(clusterId); + if (!cluster.getRecoveryConfig().equals(updatedCluster.getRecoveryConfig())) { + cluster.setRecoveryConfig(updatedCluster.getRecoveryConfig()); + changed = true; + } + if (!cluster.getHostRepositories().getRepositories() + .equals(updatedCluster.getHostRepositories().getRepositories())) { + cluster.getHostRepositories().setRepositories(updatedCluster.getHostRepositories().getRepositories()); + changed = true; + } + if (!cluster.getHostRepositories().getComponentRepos() + .equals(updatedCluster.getHostRepositories().getComponentRepos())) { + cluster.getHostRepositories().setComponentRepos(updatedCluster.getHostRepositories().getComponentRepos()); + changed = true; + } + } else { + clusters.put(clusterId, updatedCluster); + changed = true; + } + } + } + return changed; } @Override diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/MetadataHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/MetadataHolder.java index b3558cd..3d8ee35 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/MetadataHolder.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/MetadataHolder.java @@ -26,7 +26,6 @@ import org.apache.ambari.server.agent.stomp.dto.MetadataCluster; import org.apache.ambari.server.controller.AmbariManagementControllerImpl; import org.apache.ambari.server.events.AmbariPropertiesChangedEvent; import org.apache.ambari.server.events.ClusterComponentsRepoChangedEvent; -import org.apache.ambari.server.events.ClusterConfigChangedEvent; import org.apache.ambari.server.events.MetadataUpdateEvent; import org.apache.ambari.server.events.ServiceCredentialStoreUpdateEvent; import org.apache.ambari.server.events.ServiceInstalledEvent; @@ -34,6 +33,7 @@ import org.apache.ambari.server.events.UpdateEventType; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import com.google.common.eventbus.Subscribe; @@ -85,15 +85,14 @@ public class MetadataHolder extends AgentClusterDataHolder<MetadataUpdateEvent> changed = true; } else { MetadataCluster cluster = clusters.get(clusterId); - if (!cluster.getClusterLevelParams().equals(updatedCluster.getClusterLevelParams())) { - cluster.getClusterLevelParams().putAll(updatedCluster.getClusterLevelParams()); + if (cluster.updateClusterLevelParams(updatedCluster.getClusterLevelParams())) { changed = true; } - if (!cluster.getServiceLevelParams().equals(updatedCluster.getServiceLevelParams())) { - cluster.getServiceLevelParams().putAll(updatedCluster.getServiceLevelParams()); + if (cluster.updateServiceLevelParams(updatedCluster.getServiceLevelParams())) { changed = true; } - if (!cluster.getStatusCommandsToRun().equals(updatedCluster.getStatusCommandsToRun())) { + if (CollectionUtils.isNotEmpty(updatedCluster.getStatusCommandsToRun()) + && !cluster.getStatusCommandsToRun().containsAll(updatedCluster.getStatusCommandsToRun())) { cluster.getStatusCommandsToRun().addAll(updatedCluster.getStatusCommandsToRun()); changed = true; } @@ -117,13 +116,6 @@ public class MetadataHolder extends AgentClusterDataHolder<MetadataUpdateEvent> } @Subscribe - public void onConfigsChange(ClusterConfigChangedEvent configChangedEvent) throws AmbariException { - Cluster cluster = m_clusters.get().getCluster(configChangedEvent.getClusterName()); - updateData(ambariManagementController.getClusterMetadataOnConfigsUpdate(cluster)); - - } - - @Subscribe public void onServiceCreate(ServiceInstalledEvent serviceInstalledEvent) throws AmbariException { Cluster cluster = m_clusters.get().getCluster(serviceInstalledEvent.getClusterId()); updateData(ambariManagementController.getClusterMetadataOnServiceInstall(cluster, serviceInstalledEvent.getServiceName())); diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostRepositories.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostRepositories.java index 37d1146..1e63812 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostRepositories.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostRepositories.java @@ -19,7 +19,7 @@ package org.apache.ambari.server.agent.stomp.dto; -import java.util.Map; +import java.util.SortedMap; import org.apache.ambari.server.agent.CommandRepository; @@ -30,29 +30,29 @@ import com.fasterxml.jackson.annotation.JsonProperty; public class HostRepositories { @JsonProperty("commandRepos") - private Map<Long, CommandRepository> repositories; + private SortedMap<Long, CommandRepository> repositories; @JsonProperty("componentRepos") - private Map<String, Long> componentRepos; + private SortedMap<String, Long> componentRepos; - public HostRepositories(Map<Long, CommandRepository> repositories, Map<String, Long> componentRepos) { + public HostRepositories(SortedMap<Long, CommandRepository> repositories, SortedMap<String, Long> componentRepos) { this.repositories = repositories; this.componentRepos = componentRepos; } - public Map<Long, CommandRepository> getRepositories() { + public SortedMap<Long, CommandRepository> getRepositories() { return repositories; } - public void setRepositories(Map<Long, CommandRepository> repositories) { + public void setRepositories(SortedMap<Long, CommandRepository> repositories) { this.repositories = repositories; } - public Map<String, Long> getComponentRepos() { + public SortedMap<String, Long> getComponentRepos() { return componentRepos; } - public void setComponentRepos(Map<String, Long> componentRepos) { + public void setComponentRepos(SortedMap<String, Long> componentRepos) { this.componentRepos = componentRepos; } } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataCluster.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataCluster.java index bb3604e..b22ee60 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataCluster.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataCluster.java @@ -24,6 +24,7 @@ import java.util.SortedMap; import java.util.TreeMap; import org.apache.ambari.server.state.SecurityType; +import org.apache.commons.lang.StringUtils; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; @@ -78,6 +79,34 @@ public class MetadataCluster { this.agentConfigs = agentConfigs; } + public boolean updateServiceLevelParams(SortedMap<String, MetadataServiceInfo> update) { + boolean changed = false; + for (String key : update.keySet()) { + if (!serviceLevelParams.containsKey(key) || !serviceLevelParams.get(key).equals(update.get(key))) { + changed = true; + break; + } + } + if (changed) { + serviceLevelParams.putAll(update); + } + return changed; + } + + public boolean updateClusterLevelParams(SortedMap<String, String> update) { + boolean changed = false; + for (String key : update.keySet()) { + if (!clusterLevelParams.containsKey(key) || !StringUtils.equals(clusterLevelParams.get(key), update.get(key))) { + changed = true; + break; + } + } + if (changed) { + clusterLevelParams.putAll(update); + } + return changed; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java index badb79e..91eafe5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java @@ -1802,7 +1802,7 @@ public class Configuration { */ @Markdown(description = "Thread pool size for spring messaging") public static final ConfigurationProperty<Integer> MESSAGING_THREAD_POOL_SIZE = new ConfigurationProperty<>( - "messaging.threadpool.size", 1); + "messaging.threadpool.size", 10); /** * The thread pool size for agents registration. @@ -4512,7 +4512,7 @@ public class Configuration { } /** - * @return max thread pool size for clients, default 25 + * @return max thread pool size for clients, default 10 */ public int getSpringMessagingThreadPoolSize() { return Integer.parseInt(getProperty(MESSAGING_THREAD_POOL_SIZE)); diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java index 0c22b42..64360d7 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java @@ -5779,31 +5779,8 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle clusterLevelParams.put(STACK_NAME, stackId.getStackName()); clusterLevelParams.put(STACK_VERSION, stackId.getStackVersion()); - Map<String, DesiredConfig> desiredConfigs = cluster.getDesiredConfigs(); - if (MapUtils.isNotEmpty(desiredConfigs)) { - - Set<String> userSet = configHelper.getPropertyValuesWithPropertyType(stackId, PropertyType.USER, cluster, desiredConfigs); - String userList = gson.toJson(userSet); - clusterLevelParams.put(USER_LIST, userList); - - //Create a user_group mapping and send it as part of the hostLevelParams - Map<String, Set<String>> userGroupsMap = configHelper.createUserGroupsMap( - stackId, cluster, desiredConfigs); - String userGroups = gson.toJson(userGroupsMap); - clusterLevelParams.put(USER_GROUPS, userGroups); - - Set<String> groupSet = configHelper.getPropertyValuesWithPropertyType(stackId, PropertyType.GROUP, cluster, desiredConfigs); - String groupList = gson.toJson(groupSet); - clusterLevelParams.put(GROUP_LIST, groupList); - } - Set<String> notManagedHdfsPathSet = configHelper.getPropertyValuesWithPropertyType(stackId, - PropertyType.NOT_MANAGED_HDFS_PATH, cluster, desiredConfigs); - String notManagedHdfsPathList = gson.toJson(notManagedHdfsPathSet); - clusterLevelParams.put(NOT_MANAGED_HDFS_PATH_LIST, notManagedHdfsPathList); - + clusterLevelParams.putAll(getMetadataClusterLevelConfigsParams(cluster, stackId)); clusterLevelParams.put(CLUSTER_NAME, cluster.getClusterName()); - - StackInfo stackInfo = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion()); clusterLevelParams.put(HOOKS_FOLDER, configs.getProperty(Configuration.HOOKS_FOLDER)); return clusterLevelParams; @@ -5812,7 +5789,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle public TreeMap<String, String> getMetadataClusterLevelConfigsParams(Cluster cluster, StackId stackId) throws AmbariException { TreeMap<String, String> clusterLevelParams = new TreeMap<>(); - Map<String, DesiredConfig> desiredConfigs = cluster.getDesiredConfigs(); + Map<String, DesiredConfig> desiredConfigs = cluster.getDesiredConfigs(false); if (MapUtils.isNotEmpty(desiredConfigs)) { Set<String> userSet = configHelper.getPropertyValuesWithPropertyType(stackId, PropertyType.USER, cluster, desiredConfigs); @@ -5928,8 +5905,8 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle @Override public HostRepositories retrieveHostRepositories(Cluster cluster, Host host) throws AmbariException { List<ServiceComponentHost> hostComponents = cluster.getServiceComponentHosts(host.getHostName()); - Map<Long, CommandRepository> hostRepositories = new HashMap<>(); - Map<String, Long> componentsRepos = new HashMap<>(); + SortedMap<Long, CommandRepository> hostRepositories = new TreeMap<>(); + SortedMap<String, Long> componentsRepos = new TreeMap<>(); for (ServiceComponentHost serviceComponentHost : hostComponents) { CommandRepository commandRepository; diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/HostLevelParamsUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/HostLevelParamsUpdateEvent.java index d2fc257..facce46 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/HostLevelParamsUpdateEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/HostLevelParamsUpdateEvent.java @@ -82,6 +82,10 @@ public class HostLevelParamsUpdateEvent extends STOMPHostEvent implements Hashab return hostId; } + public Map<String, HostLevelParamsCluster> getHostLevelParamsClusters() { + return hostLevelParamsClusters; + } + @Override public boolean equals(Object o) { if (this == o) return true; -- To stop receiving notification emails like this one, please contact mpapirkovs...@apache.org.