AMBARI-21370: Support VIPs instead of Host Names (jluniya)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4d7cc7f3 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4d7cc7f3 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4d7cc7f3 Branch: refs/heads/feature-branch-AMBARI-21307 Commit: 4d7cc7f392a6c4b52d39456504ad490d74fd019a Parents: 4cd3150 Author: Jayush Luniya <jlun...@hortonworks.com> Authored: Thu Jun 29 07:17:24 2017 -0700 Committer: Jayush Luniya <jlun...@hortonworks.com> Committed: Thu Jun 29 07:17:24 2017 -0700 ---------------------------------------------------------------------- .../ambari_agent/AlertSchedulerHandler.py | 10 +- .../python/ambari_agent/alerts/base_alert.py | 8 +- .../python/ambari_agent/alerts/port_alert.py | 107 +++++++++++-------- .../ambari_agent/TestAlertSchedulerHandler.py | 17 +-- .../server/agent/AlertDefinitionCommand.java | 7 +- .../ambari/server/agent/HeartBeatHandler.java | 4 +- .../internal/AbstractProviderModule.java | 47 ++++++-- .../server/controller/jmx/JMXHostProvider.java | 13 +++ .../controller/jmx/JMXPropertyProvider.java | 25 +++++ .../org/apache/ambari/server/state/Cluster.java | 8 ++ .../server/state/alert/AlertDefinitionHash.java | 14 +-- .../server/state/cluster/ClusterImpl.java | 18 ++++ .../2.1.0.2.0/package/scripts/hdfs_namenode.py | 4 +- .../package/scripts/namenode_upgrade.py | 2 +- .../2.1.0.2.0/package/scripts/params_linux.py | 4 + .../metrics/JMXPropertyProviderTest.java | 9 ++ .../state/alerts/AlertDefinitionHashTest.java | 4 +- .../configs/ha_bootstrap_standby_node.json | 2 +- ...ha_bootstrap_standby_node_initial_start.json | 2 +- ...dby_node_initial_start_dfs_nameservices.json | 2 +- 20 files changed, 224 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7cc7f3/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py index 6c1d29c..55c3d6e 100644 --- a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py +++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py @@ -283,6 +283,7 @@ class AlertSchedulerHandler(): for command_json in all_commands: clusterName = '' if not 'clusterName' in command_json else command_json['clusterName'] hostName = '' if not 'hostName' in command_json else command_json['hostName'] + publicHostName = '' if not 'publicHostName' in command_json else command_json['publicHostName'] clusterHash = None if not 'hash' in command_json else command_json['hash'] # cache the cluster and cluster hash after loading the JSON @@ -291,7 +292,7 @@ class AlertSchedulerHandler(): self._cluster_hashes[clusterName] = clusterHash for definition in command_json['alertDefinitions']: - alert = self.__json_to_callable(clusterName, hostName, definition) + alert = self.__json_to_callable(clusterName, hostName, publicHostName, definition) if alert is None: continue @@ -303,7 +304,7 @@ class AlertSchedulerHandler(): return definitions - def __json_to_callable(self, clusterName, hostName, json_definition): + def __json_to_callable(self, clusterName, hostName, publicHostName, json_definition): """ converts the json that represents all aspects of a definition and makes an object that extends BaseAlert that is used for individual @@ -336,7 +337,7 @@ class AlertSchedulerHandler(): alert = RecoveryAlert(json_definition, source, self.config, self.recovery_manger) if alert is not None: - alert.set_cluster(clusterName, hostName) + alert.set_cluster(clusterName, hostName, publicHostName) except Exception, exception: logger.exception("[AlertScheduler] Unable to load an invalid alert definition. It will be skipped.") @@ -402,8 +403,9 @@ class AlertSchedulerHandler(): clusterName = '' if not 'clusterName' in execution_command else execution_command['clusterName'] hostName = '' if not 'hostName' in execution_command else execution_command['hostName'] + publicHostName = '' if not 'publicHostName' in execution_command else execution_command['publicHostName'] - alert = self.__json_to_callable(clusterName, hostName, alert_definition) + alert = self.__json_to_callable(clusterName, hostName, publicHostName, alert_definition) if alert is None: continue http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7cc7f3/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py index 7f3b2a5..add29fc 100644 --- a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py +++ b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py @@ -46,6 +46,7 @@ class BaseAlert(object): self.alert_source_meta = alert_source_meta self.cluster_name = '' self.host_name = '' + self.public_host_name = '' self.config = config def interval(self): @@ -86,10 +87,13 @@ class BaseAlert(object): self.cluster_configuration = cluster_configuration - def set_cluster(self, cluster_name, host_name): + def set_cluster(self, cluster_name, host_name, public_host_name = None): """ sets cluster information for the alert """ self.cluster_name = cluster_name self.host_name = host_name + self.public_host_name = host_name + if public_host_name: + self.public_host_name = public_host_name def _get_alert_meta_value_safely(self, meta_key): @@ -452,7 +456,7 @@ class BaseAlert(object): # get the host for dfs.namenode.http-address.c1ha.nn1 and see if it's # this host value = self._get_configuration_value(key) - if value is not None and self.host_name in value: + if value is not None and (self.host_name in value or self.public_host_name in value): return AlertUri(uri=value, is_ssl_enabled=is_ssl_enabled) return None http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7cc7f3/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py index 1e32718..02cc91c 100644 --- a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py +++ b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py @@ -91,7 +91,9 @@ class PortAlert(BaseAlert): # if not parameterized, this will return the static value uri_value = self._get_configuration_value(self.uri) + host_not_specified = False if uri_value is None: + host_not_specified = True uri_value = self.host_name logger.debug("[Alert][{0}] Setting the URI to this host since it wasn't specified".format( self.get_name())) @@ -112,6 +114,16 @@ class PortAlert(BaseAlert): host = BaseAlert.get_host_from_url(uri_value) if host is None or host == "localhost" or host == "0.0.0.0": host = self.host_name + host_not_specified = True + + hosts = [host] + # If host is not specified in the uri, hence we are using current host name + # then also add public host name as a fallback. + if host_not_specified and host.lower() == self.host_name.lower() \ + and self.host_name.lower() != self.public_host_name.lower(): + hosts.append(self.public_host_name) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("[Alert][{0}] List of hosts = {1}".format(self.get_name(), hosts)) try: port = int(get_port_from_url(uri_value)) @@ -122,51 +134,56 @@ class PortAlert(BaseAlert): port = self.default_port - - if logger.isEnabledFor(logging.DEBUG): - logger.debug("[Alert][{0}] Checking {1} on port {2}".format( - self.get_name(), host, str(port))) - - s = None - try: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.settimeout(self.critical_timeout) - - if OSCheck.is_windows_family(): - # on windows 0.0.0.0 is invalid address to connect but on linux it resolved to 127.0.0.1 - host = resolve_address(host) - - start_time = time.time() - s.connect((host, port)) - if self.socket_command is not None: - s.sendall(self.socket_command) - data = s.recv(1024) - if self.socket_command_response is not None and data != self.socket_command_response: - raise Exception("Expected response {0}, Actual response {1}".format( - self.socket_command_response, data)) - end_time = time.time() - milliseconds = end_time - start_time - seconds = milliseconds / 1000.0 - - # not sure why this happens sometimes, but we don't always get a - # socket exception if the connect() is > than the critical threshold - if seconds >= self.critical_timeout: - return (self.RESULT_CRITICAL, ['Socket Timeout', host, port]) - - result = self.RESULT_OK - if seconds >= self.warning_timeout: - result = self.RESULT_WARNING - - return (result, [seconds, port]) - except Exception as e: - return (self.RESULT_CRITICAL, [str(e), host, port]) - finally: - if s is not None: - try: - s.close() - except: - # no need to log a close failure - pass + exceptions = [] + + for host in hosts: + if logger.isEnabledFor(logging.DEBUG): + logger.debug("[Alert][{0}] Checking {1} on port {2}".format( + self.get_name(), host, str(port))) + + s = None + try: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(self.critical_timeout) + + if OSCheck.is_windows_family(): + # on windows 0.0.0.0 is invalid address to connect but on linux it resolved to 127.0.0.1 + host = resolve_address(host) + + start_time = time.time() + s.connect((host, port)) + if self.socket_command is not None: + s.sendall(self.socket_command) + data = s.recv(1024) + if self.socket_command_response is not None and data != self.socket_command_response: + raise Exception("Expected response {0}, Actual response {1}".format( + self.socket_command_response, data)) + end_time = time.time() + milliseconds = end_time - start_time + seconds = milliseconds / 1000.0 + + # not sure why this happens sometimes, but we don't always get a + # socket exception if the connect() is > than the critical threshold + if seconds >= self.critical_timeout: + return (self.RESULT_CRITICAL, ['Socket Timeout', host, port]) + + result = self.RESULT_OK + if seconds >= self.warning_timeout: + result = self.RESULT_WARNING + + return (result, [seconds, port]) + except Exception as e: + exceptions.append(e) + finally: + if s is not None: + try: + s.close() + except: + # no need to log a close failure + pass + + if exceptions: + return (self.RESULT_CRITICAL, [str(exceptions[0]), hosts[0], port]) def _get_reporting_text(self, state): ''' http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7cc7f3/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py b/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py index d1d27ef..fbcd33f 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py +++ b/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py @@ -70,7 +70,7 @@ class TestAlertSchedulerHandler(TestCase): } } - callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', copy.deepcopy(json_definition)) + callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', 'host', copy.deepcopy(json_definition)) self.assertTrue(callable_result is not None) self.assertTrue(isinstance(callable_result, MetricAlert)) @@ -85,7 +85,7 @@ class TestAlertSchedulerHandler(TestCase): } } - callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', copy.deepcopy(json_definition)) + callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', 'host', copy.deepcopy(json_definition)) self.assertTrue(callable_result is not None) self.assertTrue(isinstance(callable_result, AmsAlert)) @@ -100,7 +100,7 @@ class TestAlertSchedulerHandler(TestCase): } scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, None, self.config, None) - callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', copy.deepcopy(json_definition)) + callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', 'host', copy.deepcopy(json_definition)) self.assertTrue(callable_result is not None) self.assertTrue(isinstance(callable_result, PortAlert)) @@ -116,7 +116,7 @@ class TestAlertSchedulerHandler(TestCase): } scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, None, self.config, None) - callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', copy.deepcopy(json_definition)) + callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', 'host', copy.deepcopy(json_definition)) self.assertTrue(callable_result is not None) self.assertTrue(isinstance(callable_result, WebAlert)) @@ -131,7 +131,7 @@ class TestAlertSchedulerHandler(TestCase): } scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, None, self.config, None) - callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', copy.deepcopy(json_definition)) + callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', 'host', copy.deepcopy(json_definition)) self.assertTrue(callable_result is None) @@ -174,6 +174,7 @@ class TestAlertSchedulerHandler(TestCase): { 'clusterName': 'cluster', 'hostName': 'host', + 'publicHostName' : 'host', 'alertDefinition': { 'name': 'alert1' } @@ -191,7 +192,7 @@ class TestAlertSchedulerHandler(TestCase): scheduler.execute_alert(execution_commands) - scheduler._AlertSchedulerHandler__json_to_callable.assert_called_with('cluster', 'host', {'name': 'alert1'}) + scheduler._AlertSchedulerHandler__json_to_callable.assert_called_with('cluster', 'host', 'host', {'name': 'alert1'}) self.assertTrue(alert_mock.collect.called) def test_execute_alert_from_extension(self): @@ -199,6 +200,7 @@ class TestAlertSchedulerHandler(TestCase): { 'clusterName': 'cluster', 'hostName': 'host', + 'publicHostName' : 'host', 'alertDefinition': { 'name': 'alert1' } @@ -216,7 +218,7 @@ class TestAlertSchedulerHandler(TestCase): scheduler.execute_alert(execution_commands) - scheduler._AlertSchedulerHandler__json_to_callable.assert_called_with('cluster', 'host', {'name': 'alert1'}) + scheduler._AlertSchedulerHandler__json_to_callable.assert_called_with('cluster', 'host', 'host', {'name': 'alert1'}) self.assertTrue(alert_mock.collect.called) def test_load_definitions(self): @@ -245,6 +247,7 @@ class TestAlertSchedulerHandler(TestCase): { 'clusterName': 'cluster', 'hostName': 'host', + 'publicHostName' : 'host', 'alertDefinition': { 'name': 'alert1' } http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7cc7f3/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java index 2929087..be837db 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java @@ -46,6 +46,9 @@ public class AlertDefinitionCommand extends AgentCommand { @SerializedName("hostName") private final String m_hostName; + @SerializedName("publicHostName") + private final String m_publicHostName; + @SerializedName("hash") private final String m_hash; @@ -61,17 +64,19 @@ public class AlertDefinitionCommand extends AgentCommand { * @param clusterName * the name of the cluster this response is for ( * @param hostName + * @param publicHostName * @param hash * @param definitions * * @see AlertDefinitionHash */ - public AlertDefinitionCommand(String clusterName, String hostName, + public AlertDefinitionCommand(String clusterName, String hostName, String publicHostName, String hash, List<AlertDefinition> definitions) { super(AgentCommandType.ALERT_DEFINITION_COMMAND); m_clusterName = clusterName; m_hostName = hostName; + m_publicHostName = publicHostName; m_hash = hash; m_definitions = definitions; } http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7cc7f3/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java index 89ec963..1bc4c36 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java @@ -557,8 +557,10 @@ public class HeartBeatHandler { clusterName, hostname); String hash = alertDefinitionHash.getHash(clusterName, hostname); + Host host = cluster.getHost(hostname); + String publicHostName = host == null? hostname : host.getPublicHostName(); AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName, - hostname, hash, definitions); + hostname, publicHostName, hash, definitions); command.addConfigs(configHelper, cluster); commands.add(command); http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7cc7f3/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java index 0e4f3f4..f3211bf 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java @@ -65,6 +65,7 @@ import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.ConfigHelper; import org.apache.ambari.server.state.DesiredConfig; +import org.apache.ambari.server.state.Host; import org.apache.ambari.server.state.Service; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -459,6 +460,12 @@ public abstract class AbstractProviderModule implements ProviderModule, } @Override + public String getPublicHostName(String clusterName, String hostName) { + Host host = getHost(clusterName, hostName); + return host == null? hostName : host.getPublicHostName(); + } + + @Override public Set<String> getHostNames(String clusterName, String componentName) { Set<String> hosts = null; try { @@ -472,6 +479,21 @@ public abstract class AbstractProviderModule implements ProviderModule, } @Override + public Host getHost(String clusterName, String hostName) { + Host host = null; + try { + Cluster cluster = managementController.getClusters().getCluster(clusterName); + if(cluster != null) { + host = cluster.getHost(hostName); + } + } catch (Exception e) { + LOG.warn("Exception in getting host info for jmx metrics: ", e); + } + return host; + } + + + @Override public boolean isCollectorComponentLive(String clusterName, MetricsService service) throws SystemException { final String collectorHostName = getCollectorHostName(clusterName, service); @@ -528,12 +550,14 @@ public abstract class AbstractProviderModule implements ProviderModule, serviceConfigTypes.get(service) ); + String publicHostName = getPublicHostName(clusterName, hostName); Map<String, String[]> componentPortsProperties = new HashMap<>(); componentPortsProperties.put( componentName, getPortProperties(service, componentName, hostName, + publicHostName, configProperties, httpsEnabled ) @@ -553,7 +577,7 @@ public abstract class AbstractProviderModule implements ProviderModule, } } - initRpcSuffixes(clusterName, componentName, configType, currVersion, hostName); + initRpcSuffixes(clusterName, componentName, configType, currVersion, hostName, publicHostName); } } catch (Exception e) { LOG.error("Exception initializing jmx port maps. ", e); @@ -575,8 +599,8 @@ public abstract class AbstractProviderModule implements ProviderModule, } /** - * Computes properties that contains proper port for {@code componentName} on {@code hostName}. Must contain custom logic - * for different configurations(like NAMENODE HA). + * Computes properties that contains proper port for {@code componentName} on {@code hostName}. + * Must contain custom logic for different configurations(like NAMENODE HA). * @param service service type * @param componentName component name * @param hostName host which contains requested component @@ -584,16 +608,20 @@ public abstract class AbstractProviderModule implements ProviderModule, * @param httpsEnabled indicates if https enabled for component * @return property name that contain port for {@code componentName} on {@code hostName} */ - String[] getPortProperties(Service.Type service, String componentName, String hostName, Map<String, Object> properties, boolean httpsEnabled) { + String[] getPortProperties(Service.Type service, String componentName, + String hostName, String publicHostName, Map<String, Object> properties, boolean httpsEnabled) { componentName = httpsEnabled ? componentName + "-HTTPS" : componentName; if(componentName.startsWith("NAMENODE") && properties.containsKey("dfs.internal.nameservices")) { componentName += "-HA"; - return getNamenodeHaProperty(properties, serviceDesiredProperties.get(service).get(componentName), hostName); + return getNamenodeHaProperty( + properties, serviceDesiredProperties.get(service).get(componentName), hostName, publicHostName); } return serviceDesiredProperties.get(service).get(componentName); } - private String[] getNamenodeHaProperty(Map<String, Object> properties, String pattern[], String hostName) { + private String[] getNamenodeHaProperty(Map<String, Object> properties, String pattern[], + String hostName, String publicHostName) { + // iterate over nameservices and namenodes, to find out namenode http(s) property for concrete host for(String nameserviceId : ((String)properties.get("dfs.internal.nameservices")).split(",")) { if(properties.containsKey("dfs.ha.namenodes."+nameserviceId)) { @@ -605,7 +633,8 @@ public abstract class AbstractProviderModule implements ProviderModule, ); if (properties.containsKey(propertyName)) { String propertyValue = (String)properties.get(propertyName); - if (propertyValue.split(":")[0].equals(hostName)) { + String propHostName = propertyValue.split(":")[0]; + if (propHostName.equals(hostName) || propHostName.equals(publicHostName)) { return new String[] {propertyName}; } } @@ -1181,7 +1210,7 @@ public abstract class AbstractProviderModule implements ProviderModule, private void initRpcSuffixes(String clusterName, String componentName, String config, String configVersion, - String hostName) + String hostName, String publicHostName) throws Exception { if (jmxDesiredRpcSuffixProperties.containsKey(componentName)) { Map<String, Map<String, String>> componentToPortsMap; @@ -1209,7 +1238,7 @@ public abstract class AbstractProviderModule implements ProviderModule, keys = jmxDesiredRpcSuffixProperties.get(componentName); Map<String, String[]> stringMap = jmxDesiredRpcSuffixProperties.get(componentName); for (String tag: stringMap.keySet()) { - keys.put(tag, getNamenodeHaProperty(configProperties, stringMap.get(tag), hostName)); + keys.put(tag, getNamenodeHaProperty(configProperties, stringMap.get(tag), hostName, publicHostName)); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7cc7f3/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXHostProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXHostProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXHostProvider.java index cbeea1c..dbf8eb7 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXHostProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXHostProvider.java @@ -17,6 +17,9 @@ */ package org.apache.ambari.server.controller.jmx; +import org.apache.ambari.server.controller.spi.SystemException; +import org.apache.ambari.server.state.Host; + import java.util.Set; import org.apache.ambari.server.controller.spi.SystemException; @@ -26,6 +29,8 @@ import org.apache.ambari.server.controller.spi.SystemException; */ public interface JMXHostProvider { + String getPublicHostName(String clusterName, String hostName); + /** * Get the JMX host names for the given cluster name and component name. * @@ -38,6 +43,14 @@ public interface JMXHostProvider { Set<String> getHostNames(String clusterName, String componentName); /** + * Get cluster host info given the host name + * @param clusterName + * @param hostName the host name + * @return the host info {@link Host} + */ + Host getHost(String clusterName, String hostName); + + /** * Get the port for the specified cluster name and component. * * @param clusterName the cluster name http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7cc7f3/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java index 870d1ef..e4de377 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java @@ -40,6 +40,7 @@ import org.apache.ambari.server.controller.spi.Request; import org.apache.ambari.server.controller.spi.Resource; import org.apache.ambari.server.controller.spi.SystemException; import org.apache.ambari.server.controller.utilities.StreamProvider; +import org.apache.ambari.server.state.Host; import org.apache.ambari.server.state.services.MetricsRetrievalService; import org.apache.ambari.server.state.services.MetricsRetrievalService.MetricSourceType; import org.slf4j.Logger; @@ -254,6 +255,8 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider { for (String hostName : hostNames) { try { String port = getPort(clusterName, componentName, hostName, httpsEnabled); + String publicHostName = jmxHostProvider.getPublicHostName(clusterName, hostName); + if (port == null) { LOG.warn("Unable to get JMX metrics. No port value for " + componentName); return resource; @@ -268,6 +271,17 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider { // check to see if there is a cached value and use it if there is JMXMetricHolder jmxMetricHolder = metricsRetrievalService.getCachedJMXMetric(jmxUrl); + if( jmxMetricHolder == null && !hostName.equalsIgnoreCase(publicHostName)) { + // build the URL using public host name + String publicJmxUrl = getSpec(protocol, publicHostName, port, "/jmx"); + + // always submit a request to cache the latest data + metricsRetrievalService.submitRequest(MetricSourceType.JMX, streamProvider, publicJmxUrl); + + // check to see if there is a cached value and use it if there is + jmxMetricHolder = metricsRetrievalService.getCachedJMXMetric(publicJmxUrl); + } + // if the ticket becomes invalid (timeout) then bail out if (!ticket.isValid()) { return resource; @@ -290,6 +304,17 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider { metricsRetrievalService.submitRequest(MetricSourceType.JMX, streamProvider, adHocUrl); JMXMetricHolder adHocJMXMetricHolder = metricsRetrievalService.getCachedJMXMetric(adHocUrl); + if( adHocJMXMetricHolder == null && !hostName.equalsIgnoreCase(publicHostName)) { + // build the adhoc URL using public host name + String publicAdHocUrl = getSpec(protocol, publicHostName, port, queryURL); + + // always submit a request to cache the latest data + metricsRetrievalService.submitRequest(MetricSourceType.JMX, streamProvider, publicAdHocUrl); + + // check to see if there is a cached value and use it if there is + adHocJMXMetricHolder = metricsRetrievalService.getCachedJMXMetric(publicAdHocUrl); + } + // if the ticket becomes invalid (timeout) then bail out if (!ticket.isValid()) { return resource; http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7cc7f3/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java index b4ebcd8..b4f7120 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java @@ -133,6 +133,14 @@ public interface Cluster { */ Set<String> getHosts(String serviceName, String componentName); + /** + * Get specific host info using host name. + * + * @param hostName the host name + * @return Host info {@link Host} + */ + Host getHost(String hostName); + /** * Adds schs to cluster AND persists them http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7cc7f3/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java index a79b05d..15f7048 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java @@ -462,7 +462,7 @@ public class AlertDefinitionHash { hostNames.add(host.getHostName()); } - enqueueAgentCommands(clusterName, hostNames); + enqueueAgentCommands(cluster, clusterName, hostNames); } catch (AmbariException ae) { LOG.error("Unable to lookup cluster for alert definition commands", ae); } @@ -484,15 +484,16 @@ public class AlertDefinitionHash { */ public void enqueueAgentCommands(long clusterId, Collection<String> hosts) { String clusterName = null; + Cluster cluster = null; try { - Cluster cluster = m_clusters.get().getClusterById(clusterId); + cluster = m_clusters.get().getClusterById(clusterId); clusterName = cluster.getClusterName(); } catch (AmbariException ae) { LOG.error("Unable to lookup cluster for alert definition commands", ae); } - enqueueAgentCommands(clusterName, hosts); + enqueueAgentCommands(cluster, clusterName, hosts); } /** @@ -509,7 +510,7 @@ public class AlertDefinitionHash { * @param hosts * the hosts to push {@link AlertDefinitionCommand}s for. */ - private void enqueueAgentCommands(String clusterName, Collection<String> hosts) { + private void enqueueAgentCommands(Cluster cluster, String clusterName, Collection<String> hosts) { if (null == clusterName) { LOG.warn("Unable to create alert definition agent commands because of a null cluster name"); return; @@ -527,11 +528,12 @@ public class AlertDefinitionHash { String hash = getHash(clusterName, hostName); + Host host = cluster.getHost(hostName); + String publicHostName = host == null? hostName : host.getPublicHostName(); AlertDefinitionCommand command = new AlertDefinitionCommand( - clusterName, hostName, hash, definitions); + clusterName, hostName, publicHostName, hash, definitions); try { - Cluster cluster = m_clusters.get().getCluster(clusterName); command.addConfigs(m_configHelper.get(), cluster); } catch (AmbariException ae) { LOG.warn("Unable to add configurations to alert definition command", http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7cc7f3/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java index a4bf815..06b6217 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java @@ -2124,6 +2124,24 @@ public class ClusterImpl implements Cluster { } @Override + public Host getHost(final String hostName) { + if (StringUtils.isEmpty(hostName)) { + return null; + } + + Collection<Host> hosts = getHosts(); + if(hosts != null) { + for (Host host : hosts) { + String hostString = host.getHostName(); + if(hostName.equalsIgnoreCase(hostString)) { + return host; + } + } + } + return null; + } + + @Override public Collection<Host> getHosts() { Map<String, Host> hosts; http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7cc7f3/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py index 139fe98..7226d22 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py @@ -115,7 +115,7 @@ def namenode(action=None, hdfs_binary=None, do_format=True, upgrade_type=None, if params.dfs_ha_enabled and \ params.dfs_ha_namenode_standby is not None and \ - params.hostname == params.dfs_ha_namenode_standby: + (params.hostname == params.dfs_ha_namenode_standby or params.public_hostname == params.dfs_ha_namenode_standby): # if the current host is the standby NameNode in an HA deployment # run the bootstrap command, to start the NameNode in standby mode # this requires that the active NameNode is already up and running, @@ -332,7 +332,7 @@ def format_namenode(force=None): ) else: if params.dfs_ha_namenode_active is not None and \ - params.hostname == params.dfs_ha_namenode_active: + (params.hostname == params.dfs_ha_namenode_active or params.public_hostname == params.dfs_ha_namenode_active): # check and run the format command in the HA deployment scenario # only format the "active" namenode in an HA deployment if force: http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7cc7f3/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py index f683dcc..14d6ce2 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py @@ -47,7 +47,7 @@ def prepare_upgrade_check_for_previous_dir(): if params.dfs_ha_enabled: namenode_ha = NamenodeHAState() - if namenode_ha.is_active(params.hostname): + if namenode_ha.is_active(params.hostname) or namenode_ha.is_active(params.public_hostname): Logger.info("NameNode High Availability is enabled and this is the Active NameNode.") problematic_previous_namenode_dirs = set() http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7cc7f3/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py index 82fd950..a9fc179 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py @@ -170,6 +170,7 @@ klist_path_local = get_klist_path(default('/configurations/kerberos-env/executab kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None)) #hosts hostname = config["hostname"] +public_hostname = config["public_hostname"] rm_host = default("/clusterHostInfo/rm_host", []) slave_hosts = default("/clusterHostInfo/slave_hosts", []) oozie_servers = default("/clusterHostInfo/oozie_server", []) @@ -307,6 +308,9 @@ if dfs_ha_enabled: if hostname.lower() in nn_host.lower(): namenode_id = nn_id namenode_rpc = nn_host + elif public_hostname.lower() in nn_host.lower(): + namenode_id = nn_id + namenode_rpc = nn_host # With HA enabled namenode_address is recomputed namenode_address = format('hdfs://{dfs_ha_nameservices}') http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7cc7f3/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java index 7e0c66d..156ee66 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java @@ -53,6 +53,7 @@ import org.apache.ambari.server.security.authorization.AuthorizationException; import org.apache.ambari.server.security.authorization.AuthorizationHelperInitializer; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.Host; import org.apache.ambari.server.state.services.MetricsRetrievalService; import org.apache.ambari.server.utils.SynchronousThreadPoolExecutor; import org.junit.After; @@ -604,11 +605,19 @@ public class JMXPropertyProviderTest { this.unknownPort = unknownPort; } + @Override public String getPublicHostName(final String clusterName, final String hostName) { + return null; + } + @Override public Set<String> getHostNames(String clusterName, String componentName) { return null; } + @Override public Host getHost(final String clusterName, final String hostName) { + return null; + } + @Override public String getPort(String clusterName, String componentName, String hostName, boolean httpsEnabled) throws SystemException { return getPort(clusterName, componentName, hostName); http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7cc7f3/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionHashTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionHashTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionHashTest.java index e6e288e..4895d82 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionHashTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionHashTest.java @@ -379,10 +379,10 @@ public class AlertDefinitionHashTest extends TestCase { ActionQueue actionQueue = m_injector.getInstance(ActionQueue.class); AlertDefinitionCommand definitionCommand1 = new AlertDefinitionCommand( - CLUSTERNAME, HOSTNAME, "12345", null); + CLUSTERNAME, HOSTNAME, HOSTNAME, "12345", null); AlertDefinitionCommand definitionCommand2 = new AlertDefinitionCommand( - CLUSTERNAME, "anotherHost", "67890", null); + CLUSTERNAME, "anotherHost", "anotherHost", "67890", null); AlertExecutionCommand executionCommand = new AlertExecutionCommand( CLUSTERNAME, HOSTNAME, null); http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7cc7f3/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node.json ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node.json b/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node.json index 96f4d9d..df09021 100644 --- a/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node.json +++ b/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node.json @@ -36,7 +36,7 @@ "script_type": "PYTHON" }, "taskId": 93, - "public_hostname": "c6401.ambari.apache.org", + "public_hostname": "c6402.ambari.apache.org", "configurations": { "mapred-site": { "mapreduce.jobhistory.address": "c6402.ambari.apache.org:10020", http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7cc7f3/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node_initial_start.json ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node_initial_start.json b/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node_initial_start.json index de2742f..a0a8f36 100644 --- a/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node_initial_start.json +++ b/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node_initial_start.json @@ -37,7 +37,7 @@ "phase": "INITIAL_START" }, "taskId": 93, - "public_hostname": "c6401.ambari.apache.org", + "public_hostname": "c6402.ambari.apache.org", "configurations": { "mapred-site": { "mapreduce.jobhistory.address": "c6402.ambari.apache.org:10020", http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7cc7f3/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node_initial_start_dfs_nameservices.json ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node_initial_start_dfs_nameservices.json b/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node_initial_start_dfs_nameservices.json index ba0fa8f..a3176bd 100644 --- a/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node_initial_start_dfs_nameservices.json +++ b/ambari-server/src/test/python/stacks/2.0.6/configs/ha_bootstrap_standby_node_initial_start_dfs_nameservices.json @@ -37,7 +37,7 @@ "phase": "INITIAL_START" }, "taskId": 93, - "public_hostname": "c6401.ambari.apache.org", + "public_hostname": "c6402.ambari.apache.org", "configurations": { "mapred-site": { "mapreduce.jobhistory.address": "c6402.ambari.apache.org:10020",