This is an automated email from the ASF dual-hosted git repository. hapylestat pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push: new 571dfe6 [AMBARI-23877] Service auto-start not working (dgrinenko) 571dfe6 is described below commit 571dfe62627e786efad31fa35702c61785004dab Author: Reishin <hapy.les...@gmail.com> AuthorDate: Fri May 18 03:44:39 2018 +0300 [AMBARI-23877] Service auto-start not working (dgrinenko) --- .../src/main/python/ambari_agent/ActionQueue.py | 19 +- .../src/main/python/ambari_agent/AmbariConfig.py | 8 +- .../python/ambari_agent/ComponentStatusExecutor.py | 1 - .../main/python/ambari_agent/InitializerModule.py | 24 +- .../main/python/ambari_agent/RecoveryManager.py | 337 ++++++++------------- .../python/ambari_agent/TestRecoveryManager.py | 119 +++++--- .../apache/ambari/server/agent/RecoveryConfig.java | 22 +- .../server/agent/RecoveryConfigComponent.java | 118 ++++++++ .../ambari/server/agent/RecoveryConfigHelper.java | 8 +- .../configuration/RecoveryConfigHelperTest.java | 53 +++- 10 files changed, 392 insertions(+), 317 deletions(-) diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 65239ed..f0c996b 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -143,7 +143,7 @@ class ActionQueue(threading.Thread): if self.parallel_execution == 0: command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME) - if command == None: + if command is None: break self.process_command(command) @@ -153,17 +153,16 @@ class ActionQueue(threading.Thread): while not self.stop_event.is_set(): command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME) - if command == None: + if command is None: break # If command is not retry_enabled then do not start them in parallel # checking just one command is enough as all commands for a stage is sent # at the same time and retry is only enabled for initial start/install - retryAble = False + retry_able = False if 'commandParams' in command and 'command_retry_enabled' in command['commandParams']: - retryAble = command['commandParams']['command_retry_enabled'] == "true" - if retryAble: - logger.info("Kicking off a thread for the command, id=" + - str(command['commandId']) + " taskId=" + str(command['taskId'])) + retry_able = command['commandParams']['command_retry_enabled'] == "true" + if retry_able: + logger.info("Kicking off a thread for the command, id={} taskId={}".format(command['commandId'], command['taskId'])) t = threading.Thread(target=self.process_command, args=(command,)) t.daemon = True t.start() @@ -172,14 +171,14 @@ class ActionQueue(threading.Thread): break pass pass - except (Queue.Empty): + except Queue.Empty: pass - except: + except Exception: logger.exception("ActionQueue thread failed with exception. Re-running it") logger.info("ActionQueue thread has successfully finished") def fillRecoveryCommands(self): - if not self.tasks_in_progress_or_pending(): + if self.recovery_manager.enabled() and not self.tasks_in_progress_or_pending(): self.put(self.recovery_manager.get_recovery_commands()) def processBackgroundQueueSafeEmpty(self): diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py index 1e95fbe..88aa8ea 100644 --- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py +++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py @@ -186,21 +186,23 @@ class AmbariConfig: @property def cluster_cache_dir(self): return os.path.join(self.cache_dir, FileCache.CLUSTER_CACHE_DIRECTORY) - @property - def recovery_cache_dir(self): - return os.path.join(self.cache_dir, FileCache.RECOVERY_CACHE_DIRECTORY) + @property def alerts_cachedir(self): return os.path.join(self.cache_dir, FileCache.ALERTS_CACHE_DIRECTORY) + @property def stacks_dir(self): return os.path.join(self.cache_dir, FileCache.STACKS_CACHE_DIRECTORY) + @property def common_services_dir(self): return os.path.join(self.cache_dir, FileCache.COMMON_SERVICES_DIRECTORY) + @property def extensions_dir(self): return os.path.join(self.cache_dir, FileCache.EXTENSIONS_CACHE_DIRECTORY) + @property def host_scripts_dir(self): return os.path.join(self.cache_dir, FileCache.HOST_SCRIPTS_CACHE_DIRECTORY) diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py index c9f86da..5d20495 100644 --- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py @@ -154,7 +154,6 @@ class ComponentStatusExecutor(threading.Thread): self.send_updates_to_server({cluster_id: [result]}) return result - return None def send_updates_to_server(self, cluster_reports): diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py index 052e8c1..787df16 100644 --- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py +++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py @@ -46,6 +46,7 @@ from ambari_agent.AlertStatusReporter import AlertStatusReporter logger = logging.getLogger(__name__) + class InitializerModule: """ - Instantiate some singleton classes or widely used instances along with providing their dependencies. @@ -56,6 +57,23 @@ class InitializerModule: def __init__(self): self.stop_event = threading.Event() self.config = AmbariConfig.get_resolved_config() + + self.is_registered = None + self.metadata_cache = None + self.topology_cache = None + self.host_level_params_cache = None + self.configurations_cache = None + self.alert_definitions_cache = None + self.configuration_builder = None + self.stale_alerts_monitor = None + self.server_responses_listener = None + self.file_cache = None + self.customServiceOrchestrator = None + self.recovery_manager = None + self.commandStatuses = None + self.action_queue = None + self.alert_scheduler_handler = None + self.init() def init(self): @@ -71,14 +89,10 @@ class InitializerModule: self.alert_definitions_cache = ClusterAlertDefinitionsCache(self.config.cluster_cache_dir) self.configuration_builder = ConfigurationBuilder(self) self.stale_alerts_monitor = StaleAlertsMonitor(self) - self.server_responses_listener = ServerResponsesListener() - self.file_cache = FileCache(self.config) - self.customServiceOrchestrator = CustomServiceOrchestrator(self) - - self.recovery_manager = RecoveryManager(self.config.recovery_cache_dir) + self.recovery_manager = RecoveryManager() self.commandStatuses = CommandStatusDict(self) self.init_threads() diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py index 3a20b20..f7ec134 100644 --- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py +++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python - # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -15,10 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import logging import copy -import os import time import threading import pprint @@ -29,17 +25,17 @@ from ambari_agent.LiveStatus import LiveStatus logger = logging.getLogger() -""" -RecoveryManager has the following capabilities: -* Store data needed for execution commands extracted from STATUS command -* Generate INSTALL command -* Generate START command -""" - class RecoveryManager: + """ + RecoveryManager has the following capabilities: + * Store data needed for execution commands extracted from STATUS command + * Generate INSTALL command + * Generate START command + """ COMMAND_TYPE = "commandType" PAYLOAD_LEVEL = "payloadLevel" + SERVICE_NAME = "serviceName" COMPONENT_NAME = "componentName" ROLE = "role" TASK_ID = "taskId" @@ -57,7 +53,7 @@ class RecoveryManager: INIT = "INIT" # TODO: What is the state when machine is reset INSTALL_FAILED = "INSTALL_FAILED" COMPONENT_UPDATE_KEY_FORMAT = "{0}_UPDATE_TIME" - COMMAND_REFRESH_DELAY_SEC = 600 #10 minutes + COMMAND_REFRESH_DELAY_SEC = 600 FILENAME = "recovery.json" @@ -77,13 +73,15 @@ class RecoveryManager: "stale_config": False } - def __init__(self, cache_dir, recovery_enabled=False, auto_start_only=False, auto_install_start=False): + def __init__(self, recovery_enabled=False, auto_start_only=False, auto_install_start=False): self.recovery_enabled = recovery_enabled self.auto_start_only = auto_start_only self.auto_install_start = auto_install_start self.max_count = 6 self.window_in_min = 60 self.retry_gap = 5 + self.window_in_sec = self.window_in_min * 60 + self.retry_gap_in_sec = self.retry_gap * 60 self.max_lifetime_count = 12 self.id = int(time.time()) @@ -91,6 +89,7 @@ class RecoveryManager: self.allowed_current_states = [self.INIT, self.INSTALLED] self.enabled_components = [] self.statuses = {} + self.__component_to_service_map = {} # component => service map TODO: fix it later(hack here) self.__status_lock = threading.RLock() self.__command_lock = threading.RLock() self.__active_command_lock = threading.RLock() @@ -98,29 +97,16 @@ class RecoveryManager: self.active_command_count = 0 self.cluster_id = None - if not os.path.exists(cache_dir): - try: - os.makedirs(cache_dir) - except: - logger.critical("[RecoveryManager] Could not create the cache directory {0}".format(cache_dir)) - - self.__actions_json_file = os.path.join(cache_dir, self.FILENAME) - self.actions = {} - self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, auto_install_start, "") - pass - def on_execution_command_start(self): with self.__active_command_lock: self.active_command_count += 1 - pass def on_execution_command_finish(self): with self.__active_command_lock: self.active_command_count -= 1 - pass def has_active_command(self): return self.active_command_count > 0 @@ -131,12 +117,10 @@ class RecoveryManager: def get_current_status(self, component): if component in self.statuses: return self.statuses[component]["current"] - pass def get_desired_status(self, component): if component in self.statuses: return self.statuses[component]["desired"] - pass def update_config_staleness(self, component, is_config_stale): """ @@ -154,17 +138,10 @@ class RecoveryManager: pass self.statuses[component]["stale_config"] = is_config_stale - pass def handle_status_change(self, component, component_status): - if not self.enabled() or not self.configured_for_recovery(component): - return - - if component_status == LiveStatus.LIVE_STATUS: - self.update_current_status(component, component_status) - else: - if (self.get_current_status(component) != self.INSTALL_FAILED): - self.update_current_status(component, component_status) + if component_status == LiveStatus.LIVE_STATUS or self.get_current_status(component) != self.INSTALL_FAILED: + self.update_current_status(component, component_status) def update_current_status(self, component, state): """ @@ -184,9 +161,8 @@ class RecoveryManager: if self.statuses[component]["current"] != state: logger.info("current status is set to %s for %s", state, component) - self.statuses[component]["current"] = state - pass + self.statuses[component]["current"] = state def update_desired_status(self, component, state): """ @@ -202,21 +178,16 @@ class RecoveryManager: logger.info("New status, desired status is set to %s for %s", self.statuses[component]["desired"], component) finally: self.__status_lock.release() - pass if self.statuses[component]["desired"] != state: logger.info("desired status is set to %s for %s", state, component) self.statuses[component]["desired"] = state - pass - """ - Whether specific components are enabled for recovery. - """ def configured_for_recovery(self, component): - if len(self.enabled_components) > 0 and component in self.enabled_components: - return True - - return False + """ + Whether specific components are enabled for recovery. + """ + return len(self.enabled_components) > 0 and component in self.enabled_components def requires_recovery(self, component): """ @@ -225,23 +196,15 @@ class RecoveryManager: INIT --> INSTALLED --> STARTED RE-INSTALLED (if configs do not match) """ - if not self.enabled(): - return False - - if not self.configured_for_recovery(component): - return False - - if component not in self.statuses: + if not self.enabled() or not self.configured_for_recovery(component) or component not in self.statuses: return False status = self.statuses[component] if self.auto_start_only or self.auto_install_start: - if status["current"] == status["desired"]: - return False - if status["desired"] not in self.allowed_desired_states: + if status["current"] == status["desired"] or status["desired"] not in self.allowed_desired_states: return False else: - if status["current"] == status["desired"] and status['stale_config'] == False: + if status["current"] == status["desired"] and status['stale_config'] is False: return False if status["desired"] not in self.allowed_desired_states or status["current"] not in self.allowed_current_states: @@ -249,9 +212,6 @@ class RecoveryManager: logger.info("%s needs recovery, desired = %s, and current = %s.", component, status["desired"], status["current"]) return True - pass - - def get_recovery_status(self): """ @@ -268,8 +228,7 @@ class RecoveryManager: ] } """ - report = {} - report["summary"] = "DISABLED" + report = {"summary": "DISABLED"} if self.enabled(): report["summary"] = "RECOVERABLE" num_limits_reached = 0 @@ -279,24 +238,23 @@ class RecoveryManager: try: for component in self.actions.keys(): action = self.actions[component] - recovery_state = {} - recovery_state["name"] = component - recovery_state["numAttempts"] = action["lifetimeCount"] - recovery_state["limitReached"] = self.max_lifetime_count <= action["lifetimeCount"] + recovery_state = { + "name": component, + "numAttempts": action["lifetimeCount"], + "limitReached": self.max_lifetime_count <= action["lifetimeCount"] + } recovery_states.append(recovery_state) - if recovery_state["limitReached"] == True: + if recovery_state["limitReached"] is True: num_limits_reached += 1 - pass finally: self.__status_lock.release() - if num_limits_reached > 0: + if num_limits_reached > 0 and num_limits_reached == len(recovery_states): + report["summary"] = "UNRECOVERABLE" + elif num_limits_reached > 0: report["summary"] = "PARTIALLY_RECOVERABLE" - if num_limits_reached == len(recovery_states): - report["summary"] = "UNRECOVERABLE" return report - pass def get_recovery_commands(self): """ @@ -308,39 +266,34 @@ class RecoveryManager: """ commands = [] for component in self.statuses.keys(): - if self.requires_recovery(component) and self.may_execute(component): + if self.configured_for_recovery(component) and self.requires_recovery(component) and self.may_execute(component): status = copy.deepcopy(self.statuses[component]) command = None if self.auto_start_only: - if status["desired"] == self.STARTED: - if status["current"] == self.INSTALLED: - command = self.get_start_command(component) + if status["desired"] == self.STARTED and status["current"] == self.INSTALLED: + command = self.get_start_command(component) elif self.auto_install_start: - if status["desired"] == self.STARTED: - if status["current"] == self.INSTALLED: - command = self.get_start_command(component) - elif status["current"] == self.INSTALL_FAILED: - command = self.get_install_command(component) - elif status["desired"] == self.INSTALLED: - if status["current"] == self.INSTALL_FAILED: + if status["desired"] == self.STARTED and status["current"] == self.INSTALLED: + command = self.get_start_command(component) + elif status["desired"] == self.STARTED and status["current"] == self.INSTALL_FAILED: + command = self.get_install_command(component) + elif status["desired"] == self.INSTALLED and status["current"] == self.INSTALL_FAILED: command = self.get_install_command(component) else: # START, INSTALL, RESTART if status["desired"] != status["current"]: - if status["desired"] == self.STARTED: - if status["current"] == self.INSTALLED: - command = self.get_start_command(component) - elif status["current"] == self.INIT: - command = self.get_install_command(component) - elif status["current"] == self.INSTALL_FAILED: - command = self.get_install_command(component) - elif status["desired"] == self.INSTALLED: - if status["current"] == self.INIT: - command = self.get_install_command(component) - elif status["current"] == self.INSTALL_FAILED: - command = self.get_install_command(component) - elif status["current"] == self.STARTED: - command = self.get_stop_command(component) + if status["desired"] == self.STARTED and status["current"] == self.INSTALLED: + command = self.get_start_command(component) + elif status["desired"] == self.STARTED and status["current"] == self.INIT: + command = self.get_install_command(component) + elif status["desired"] == self.STARTED and status["current"] == self.INSTALL_FAILED: + command = self.get_install_command(component) + elif status["desired"] == self.INSTALLED and status["current"] == self.INIT: + command = self.get_install_command(component) + elif status["desired"] == self.INSTALLED and status["current"] == self.INSTALL_FAILED: + command = self.get_install_command(component) + elif status["desired"] == self.INSTALLED and status["current"] == self.STARTED: + command = self.get_stop_command(component) else: if status["current"] == self.INSTALLED: command = self.get_install_command(component) @@ -349,11 +302,10 @@ class RecoveryManager: if command: self.execute(component) - logger.info("Created recovery command %s for component %s", - command[self.ROLE_COMMAND], command[self.ROLE]) + logger.info("Created recovery command %s for component %s", command[self.ROLE_COMMAND], command[self.ROLE]) commands.append(command) - return commands + return commands def may_execute(self, action): """ @@ -369,8 +321,6 @@ class RecoveryManager: finally: self.__status_lock.release() return self._execute_action_chk_only(action) - pass - def execute(self, action): """ @@ -386,8 +336,6 @@ class RecoveryManager: finally: self.__status_lock.release() return self._execute_action_(action) - pass - def _execute_action_(self, action_name): """ @@ -398,7 +346,7 @@ class RecoveryManager: executed = False seconds_since_last_attempt = now - action_counter["lastAttempt"] if action_counter["lifetimeCount"] < self.max_lifetime_count: - #reset if window_in_sec seconds passed since last attempt + # reset if window_in_sec seconds passed since last attempt if seconds_since_last_attempt > self.window_in_sec: action_counter["count"] = 0 action_counter["lastReset"] = now @@ -406,7 +354,7 @@ class RecoveryManager: if action_counter["count"] < self.max_count: if seconds_since_last_attempt > self.retry_gap_in_sec: action_counter["count"] += 1 - action_counter["lifetimeCount"] +=1 + action_counter["lifetimeCount"] += 1 if self.retry_gap > 0: action_counter["lastAttempt"] = now action_counter["warnedLastAttempt"] = False @@ -414,28 +362,27 @@ class RecoveryManager: action_counter["lastReset"] = now executed = True else: - if action_counter["warnedLastAttempt"] == False: + if action_counter["warnedLastAttempt"] is False: action_counter["warnedLastAttempt"] = True logger.warn( "%s seconds has not passed since last occurrence %s seconds back for %s. " + "Will silently skip execution without warning till retry gap is passed", self.retry_gap_in_sec, seconds_since_last_attempt, action_name) else: - logger.debug( - "%s seconds has not passed since last occurrence %s seconds back for %s", - self.retry_gap_in_sec, seconds_since_last_attempt, action_name) + logger.debug("%s seconds has not passed since last occurrence %s seconds back for %s", + self.retry_gap_in_sec, seconds_since_last_attempt, action_name) else: sec_since_last_reset = now - action_counter["lastReset"] if sec_since_last_reset > self.window_in_sec: action_counter["count"] = 1 - action_counter["lifetimeCount"] +=1 + action_counter["lifetimeCount"] += 1 if self.retry_gap > 0: action_counter["lastAttempt"] = now action_counter["lastReset"] = now action_counter["warnedLastReset"] = False executed = True else: - if action_counter["warnedLastReset"] == False: + if action_counter["warnedLastReset"] is False: action_counter["warnedLastReset"] = True logger.warn("%s occurrences in %s minutes reached the limit for %s. " + "Will silently skip execution without warning till window is reset", @@ -444,7 +391,7 @@ class RecoveryManager: logger.debug("%s occurrences in %s minutes reached the limit for %s", action_counter["count"], self.window_in_min, action_name) else: - if action_counter["warnedThresholdReached"] == False: + if action_counter["warnedThresholdReached"] is False: action_counter["warnedThresholdReached"] = True logger.warn("%s occurrences in agent life time reached the limit for %s. " + "Will silently skip execution without warning till window is reset", @@ -452,47 +399,7 @@ class RecoveryManager: else: logger.error("%s occurrences in agent life time reached the limit for %s", action_counter["lifetimeCount"], action_name) - self._dump_actions() return executed - pass - - - def _dump_actions(self): - """ - Dump recovery actions to FS - """ - self.__cache_lock.acquire() - try: - with open(self.__actions_json_file, 'w') as f: - json.dump(self.actions, f, indent=2) - except Exception, exception: - logger.exception("Unable to dump actions to {0}".format(self.__actions_json_file)) - return False - finally: - self.__cache_lock.release() - - return True - pass - - - def _load_actions(self): - """ - Loads recovery actions from FS - """ - self.__cache_lock.acquire() - - try: - if os.path.isfile(self.__actions_json_file): - with open(self.__actions_json_file, 'r') as fp: - return json.load(fp) - except Exception, exception: - logger.warning("Unable to load recovery actions from {0}.".format(self.__actions_json_file)) - finally: - self.__cache_lock.release() - - return {} - pass - def get_actions_copy(self): """ @@ -503,8 +410,6 @@ class RecoveryManager: return copy.deepcopy(self.actions) finally: self.__status_lock.release() - pass - def is_action_info_stale(self, action_name): """ @@ -518,7 +423,6 @@ class RecoveryManager: seconds_since_last_attempt = now - action_counter["lastAttempt"] return seconds_since_last_attempt > self.window_in_sec return False - pass def _execute_action_chk_only(self, action_name): """ @@ -538,12 +442,9 @@ class RecoveryManager: return True return False - pass def _now_(self): return int(time.time()) - pass - def update_recovery_config(self, dictionary): """ @@ -564,8 +465,7 @@ class RecoveryManager: window_in_min = 60 retry_gap = 5 max_lifetime_count = 12 - enabled_components = "" - + enabled_components = [] if dictionary and "recoveryConfig" in dictionary: if logger.isEnabledFor(logging.INFO): @@ -593,27 +493,26 @@ class RecoveryManager: self.update_config(max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only, auto_install_start, enabled_components) - pass - """ - Update recovery configuration with the specified values. - - max_count - Configured maximum count of recovery attempt allowed per host component in a window. - window_in_min - Configured window size in minutes. - retry_gap - Configured retry gap between tries per host component - max_lifetime_count - Configured maximum lifetime count of recovery attempt allowed per host component. - recovery_enabled - True or False. Indicates whether recovery is enabled or not. - auto_start_only - True if AUTO_START recovery type was specified. False otherwise. - auto_install_start - True if AUTO_INSTALL_START recovery type was specified. False otherwise. - enabled_components - CSV of componenents enabled for auto start. - """ def update_config(self, max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only, auto_install_start, enabled_components): """ + Update recovery configuration with the specified values. + + max_count - Configured maximum count of recovery attempt allowed per host component in a window. + window_in_min - Configured window size in minutes. + retry_gap - Configured retry gap between tries per host component + max_lifetime_count - Configured maximum lifetime count of recovery attempt allowed per host component. + recovery_enabled - True or False. Indicates whether recovery is enabled or not. + auto_start_only - True if AUTO_START recovery type was specified. False otherwise. + auto_install_start - True if AUTO_INSTALL_START recovery type was specified. False otherwise. + enabled_components - CSV of componenents enabled for auto start. + + Update recovery configuration, recovery is disabled if configuration values are not correct """ - self.recovery_enabled = False; + self.recovery_enabled = False if max_count <= 0: logger.warn("Recovery disabled: max_count must be a non-negative number") return @@ -653,10 +552,17 @@ class RecoveryManager: self.allowed_current_states = [self.INSTALL_FAILED, self.INSTALLED] if enabled_components is not None and len(enabled_components) > 0: - components = enabled_components.split(",") - for component in components: - if len(component.strip()) > 0: - self.enabled_components.append(component.strip()) + components = [(item["service_name"], item["component_name"], item["desired_state"]) for item in enabled_components] + for service, component, state in components: + self.enabled_components.append(component) + self.update_desired_status(component, state) + + # Recovery Manager is Component oriented, however Agent require Service and component name to build properly + # commands. As workaround, we pushing service name from the server and keeping it relation at agent. + # + # However it important to keep map actual, for this reason relation could be updated if service will + # push another service <-> component relation + self.__component_to_service_map[component] = service self.recovery_enabled = recovery_enabled if self.recovery_enabled: @@ -665,8 +571,6 @@ class RecoveryManager: " lifetime max being %s. Enabled components - %s", self.max_count, self.window_in_min, self.retry_gap, self.max_lifetime_count, ', '.join(self.enabled_components)) - pass - def get_unique_task_id(self): self.id += 1 @@ -679,33 +583,31 @@ class RecoveryManager: if not self.enabled(): return - if not command.has_key(self.ROLE_COMMAND) or not self.configured_for_recovery(command['role']): + if self.ROLE_COMMAND not in command or not self.configured_for_recovery(command['role']): return if status == ActionQueue.COMPLETED_STATUS: if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START: self.update_current_status(command[self.ROLE], LiveStatus.LIVE_STATUS) - #self.update_config_staleness(command['role'], False) - logger.info("After EXECUTION_COMMAND (START), with taskId=" + str(command['taskId']) + - ", current state of " + command[self.ROLE] + " to " + - self.get_current_status(command[self.ROLE]) ) + logger.info("After EXECUTION_COMMAND (START), with taskId={}, current state of {} to {}".format( + command['taskId'], command[self.ROLE], self.get_current_status(command[self.ROLE]))) + elif command['roleCommand'] == ActionQueue.ROLE_COMMAND_STOP or command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL: self.update_current_status(command[self.ROLE], LiveStatus.DEAD_STATUS) - logger.info("After EXECUTION_COMMAND (STOP/INSTALL), with taskId=" + str(command['taskId']) + - ", current state of " + command[self.ROLE] + " to " + - self.get_current_status(command[self.ROLE]) ) + logger.info("After EXECUTION_COMMAND (STOP/INSTALL), with taskId={}, current state of {} to {}".format( + command['taskId'], command[self.ROLE], self.get_current_status(command[self.ROLE]))) + elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_CUSTOM_COMMAND: - if command.has_key('custom_command') and command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART: + if 'custom_command' in command and command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART: self.update_current_status(command['role'], LiveStatus.LIVE_STATUS) - #self.update_config_staleness(command['role'], False) - logger.info("After EXECUTION_COMMAND (RESTART), current state of " + command[self.ROLE] + " to " + - self.get_current_status(command[self.ROLE]) ) + logger.info("After EXECUTION_COMMAND (RESTART), current state of {} to {}".format( + command[self.ROLE], self.get_current_status(command[self.ROLE]))) + elif status == ActionQueue.FAILED_STATUS: if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL: self.update_current_status(command[self.ROLE], self.INSTALL_FAILED) - logger.info("After EXECUTION_COMMAND (INSTALL), with taskId=" + str(command['taskId']) + - ", current state of " + command[self.ROLE] + " to " + - self.get_current_status(command[self.ROLE])) + logger.info("After EXECUTION_COMMAND (INSTALL), with taskId={}, current state of {} to {}".format( + command['taskId'], command[self.ROLE], self.get_current_status(command[self.ROLE]))) def process_execution_command(self, command): """ @@ -714,28 +616,30 @@ class RecoveryManager: if not self.enabled(): return - if not self.COMMAND_TYPE in command or not command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND: + if self.COMMAND_TYPE not in command or not command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND: return - if not self.ROLE in command: + if self.ROLE not in command: return if command[self.ROLE_COMMAND] in (ActionQueue.ROLE_COMMAND_INSTALL, ActionQueue.ROLE_COMMAND_STOP) \ and self.configured_for_recovery(command[self.ROLE]): + self.update_desired_status(command[self.ROLE], LiveStatus.DEAD_STATUS) - logger.info("Received EXECUTION_COMMAND (STOP/INSTALL), desired state of " + command[self.ROLE] + " to " + - self.get_desired_status(command[self.ROLE]) ) - elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START \ - and self.configured_for_recovery(command[self.ROLE]): + logger.info("Received EXECUTION_COMMAND (STOP/INSTALL), desired state of {} to {}".format( + command[self.ROLE], self.get_desired_status(command[self.ROLE]))) + + elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START and self.configured_for_recovery(command[self.ROLE]): self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS) - logger.info("Received EXECUTION_COMMAND (START), desired state of " + command[self.ROLE] + " to " + - self.get_desired_status(command[self.ROLE]) ) - elif command.has_key('custom_command') and \ - command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART \ + logger.info("Received EXECUTION_COMMAND (START), desired state of {} to {}".format( + command[self.ROLE], self.get_desired_status(command[self.ROLE]))) + + elif 'custom_command' in command and command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART \ and self.configured_for_recovery(command[self.ROLE]): + self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS) - logger.info("Received EXECUTION_COMMAND (RESTART), desired state of " + command[self.ROLE] + " to " + - self.get_desired_status(command[self.ROLE]) ) + logger.info("Received EXECUTION_COMMAND (RESTART), desired state of {} to {}".format( + command[self.ROLE], self.get_desired_status(command[self.ROLE]))) def get_command(self, component, command_name): """ @@ -755,6 +659,10 @@ class RecoveryManager: self.ROLE: component, self.COMMAND_ID: command_id } + + if component in self.__component_to_service_map: + command[self.SERVICE_NAME] = self.__component_to_service_map[component] + return command else: logger.info("Recovery is not enabled. START command will not be computed.") @@ -786,12 +694,3 @@ class RecoveryManager: except ValueError: pass return int_value - - -def main(argv=None): - cmd_mgr = RecoveryManager('/tmp') - pass - - -if __name__ == '__main__': - main() diff --git a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py index b9800bb..432e74b 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py +++ b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py @@ -116,14 +116,8 @@ class _TestRecoveryManager(TestCase): } } - def setUp(self): - pass - - def tearDown(self): - pass - def test_defaults(self): - rm = RecoveryManager(tempfile.mktemp()) + rm = RecoveryManager() self.assertFalse(rm.enabled()) self.assertEqual(None, rm.get_install_command("NODEMANAGER")) self.assertEqual(None, rm.get_start_command("NODEMANAGER")) @@ -131,7 +125,6 @@ class _TestRecoveryManager(TestCase): rm.update_current_status("NODEMANAGER", "INSTALLED") rm.update_desired_status("NODEMANAGER", "STARTED") self.assertFalse(rm.requires_recovery("NODEMANAGER")) - pass @patch.object(RecoveryManager, "_now_") def test_sliding_window(self, time_mock): @@ -139,7 +132,7 @@ class _TestRecoveryManager(TestCase): [1000, 1001, 1002, 1003, 1004, 1071, 1150, 1151, 1152, 1153, 1400, 1401, 1500, 1571, 1572, 1653, 1900, 1971, 2300, 2301] - rm = RecoveryManager(tempfile.mktemp(), True, False) + rm = RecoveryManager(True, False) self.assertTrue(rm.enabled()) config = rm.update_config(0, 60, 5, 12, True, False, False, "") @@ -206,11 +199,12 @@ class _TestRecoveryManager(TestCase): # lifetime max reached self.assertTrue(rm.execute("NODEMANAGER2")) self.assertFalse(rm.execute("NODEMANAGER2")) - pass def test_recovery_required(self): - rm = RecoveryManager(tempfile.mktemp(), True, False) - rm.update_config(12, 5, 1, 15, True, False, False, "NODEMANAGER") + rm = RecoveryManager(True, False) + rm.update_config(12, 5, 1, 15, True, False, False, [ + {'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'} + ]) rm.update_current_status("NODEMANAGER", "INSTALLED") rm.update_desired_status("NODEMANAGER", "INSTALLED") self.assertFalse(rm.requires_recovery("NODEMANAGER")) @@ -239,7 +233,7 @@ class _TestRecoveryManager(TestCase): rm.update_desired_status("NODEMANAGER", "STARTED") self.assertTrue(rm.requires_recovery("NODEMANAGER")) - rm = RecoveryManager(tempfile.mktemp(), True, True) + rm = RecoveryManager(True, True) rm.update_current_status("NODEMANAGER", "INIT") rm.update_desired_status("NODEMANAGER", "INSTALLED") @@ -253,18 +247,20 @@ class _TestRecoveryManager(TestCase): rm.update_desired_status("NODEMANAGER", "START") self.assertFalse(rm.requires_recovery("NODEMANAGER")) - pass - def test_recovery_required2(self): - rm = RecoveryManager(tempfile.mktemp(), True, True) - rm.update_config(15, 5, 1, 16, True, False, False, "NODEMANAGER") + rm = RecoveryManager(True, True) + rm.update_config(15, 5, 1, 16, True, False, False, [ + {'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'} + ]) rm.update_current_status("NODEMANAGER", "INSTALLED") rm.update_desired_status("NODEMANAGER", "STARTED") self.assertTrue(rm.requires_recovery("NODEMANAGER")) - rm = RecoveryManager(tempfile.mktemp(), True, True) - rm.update_config(15, 5, 1, 16, True, False, False, "NODEMANAGER") + rm = RecoveryManager( True, True) + rm.update_config(15, 5, 1, 16, True, False, False, [ + {'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'} + ]) rm.update_current_status("NODEMANAGER", "INSTALLED") rm.update_desired_status("NODEMANAGER", "STARTED") self.assertTrue(rm.requires_recovery("NODEMANAGER")) @@ -273,7 +269,7 @@ class _TestRecoveryManager(TestCase): rm.update_desired_status("DATANODE", "STARTED") self.assertFalse(rm.requires_recovery("DATANODE")) - rm = RecoveryManager(tempfile.mktemp(), True, True) + rm = RecoveryManager(True, True) rm.update_config(15, 5, 1, 16, True, False, False, "") rm.update_current_status("NODEMANAGER", "INSTALLED") rm.update_desired_status("NODEMANAGER", "STARTED") @@ -283,7 +279,9 @@ class _TestRecoveryManager(TestCase): rm.update_desired_status("DATANODE", "STARTED") self.assertFalse(rm.requires_recovery("DATANODE")) - rm.update_config(15, 5, 1, 16, True, False, False, "NODEMANAGER") + rm.update_config(15, 5, 1, 16, True, False, False, [ + {'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'} + ]) rm.update_current_status("NODEMANAGER", "INSTALLED") rm.update_desired_status("NODEMANAGER", "STARTED") self.assertTrue(rm.requires_recovery("NODEMANAGER")) @@ -291,31 +289,30 @@ class _TestRecoveryManager(TestCase): rm.update_current_status("DATANODE", "INSTALLED") rm.update_desired_status("DATANODE", "STARTED") self.assertFalse(rm.requires_recovery("DATANODE")) - pass @patch.object(RecoveryManager, "update_config") def test_update_rm_config(self, mock_uc): - rm = RecoveryManager(tempfile.mktemp()) + rm = RecoveryManager() rm.update_recovery_config(None) - mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, "")]) + mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, [])]) mock_uc.reset_mock() rm.update_recovery_config({}) - mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, "")]) + mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, [])]) mock_uc.reset_mock() rm.update_recovery_config( {"recoveryConfig": { "type" : "DEFAULT"}} ) - mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, "")]) + mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, [])]) mock_uc.reset_mock() rm.update_recovery_config( {"recoveryConfig": { "type" : "FULL"}} ) - mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, False, "")]) + mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, False, [])]) mock_uc.reset_mock() rm.update_recovery_config( @@ -323,7 +320,7 @@ class _TestRecoveryManager(TestCase): "type" : "AUTO_START", "max_count" : "med"}} ) - mock_uc.assert_has_calls([call(6, 60, 5, 12, True, True, False, "")]) + mock_uc.assert_has_calls([call(6, 60, 5, 12, True, True, False, [])]) mock_uc.reset_mock() rm.update_recovery_config( @@ -331,28 +328,41 @@ class _TestRecoveryManager(TestCase): "type" : "AUTO_INSTALL_START", "max_count" : "med"}} ) - mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, True, "")]) + mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, True, [])]) mock_uc.reset_mock() rm.update_recovery_config( {"recoveryConfig": { - "type" : "AUTO_START", - "maxCount" : "5", + "type": "AUTO_START", + "maxCount": "5", "windowInMinutes" : 20, - "retryGap" : 2, + "retryGap": 2, "maxLifetimeCount" : 5, - "components" : " A,B", - "recoveryTimestamp" : 1}} + "components": [ + { + "service_name": "A", + "component_name": "A", + "desired_state": "INSTALLED" + }, + { + "service_name": "B", + "component_name": "B", + "desired_state": "INSTALLED" + } + ], + "recoveryTimestamp": 1}} ) - mock_uc.assert_has_calls([call(5, 20, 2, 5, True, True, False, " A,B")]) - pass + mock_uc.assert_has_calls([call(5, 20, 2, 5, True, True, False, [ + {'component_name': 'A', 'service_name': 'A', 'desired_state': 'INSTALLED'}, + {'component_name': 'B', 'service_name': 'B', 'desired_state': 'INSTALLED'} + ])]) @patch.object(RecoveryManager, "_now_") def test_recovery_report(self, time_mock): time_mock.side_effect = \ [1000, 1071, 1072, 1470, 1471, 1472, 1543, 1644, 1815] - rm = RecoveryManager(tempfile.mktemp()) + rm = RecoveryManager() rec_st = rm.get_recovery_status() self.assertEquals(rec_st, {"summary": "DISABLED"}) @@ -392,20 +402,21 @@ class _TestRecoveryManager(TestCase): {"name": "LION", "numAttempts": 4, "limitReached": True}, {"name": "PUMA", "numAttempts": 4, "limitReached": True} ]}) - pass @patch.object(RecoveryManager, "_now_") def test_command_expiry(self, time_mock): time_mock.side_effect = \ [1000, 1001, 1104, 1105, 1106, 1807, 1808, 1809, 1810, 1811, 1812] - rm = RecoveryManager(tempfile.mktemp(), True) + rm = RecoveryManager(True) rm.update_config(5, 5, 0, 11, True, False, False, "") command1 = copy.deepcopy(self.command) #rm.store_or_update_command(command1) - rm.update_config(12, 5, 1, 15, True, False, False, "NODEMANAGER") + rm.update_config(12, 5, 1, 15, True, False, False, [ + {'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'} + ]) rm.update_current_status("NODEMANAGER", "INSTALLED") rm.update_desired_status("NODEMANAGER", "STARTED") @@ -426,28 +437,38 @@ class _TestRecoveryManager(TestCase): commands = rm.get_recovery_commands() self.assertEqual(1, len(commands)) self.assertEqual("START", commands[0]["roleCommand"]) - pass def test_configured_for_recovery(self): - rm = RecoveryManager(tempfile.mktemp(), True) - rm.update_config(12, 5, 1, 15, True, False, False, "A,B") + rm = RecoveryManager(True) + rm.update_config(12, 5, 1, 15, True, False, False, [ + {'component_name': 'A', 'service_name': 'A', 'desired_state': 'INSTALLED'}, + {'component_name': 'B', 'service_name': 'B', 'desired_state': 'INSTALLED'}, + ]) self.assertTrue(rm.configured_for_recovery("A")) self.assertTrue(rm.configured_for_recovery("B")) - rm.update_config(5, 5, 1, 11, True, False, False, "") + rm.update_config(5, 5, 1, 11, True, False, False, []) self.assertFalse(rm.configured_for_recovery("A")) self.assertFalse(rm.configured_for_recovery("B")) - rm.update_config(5, 5, 1, 11, True, False, False, "A") + rm.update_config(5, 5, 1, 11, True, False, False, [ + {'component_name': 'A', 'service_name': 'A', 'desired_state': 'INSTALLED'} + ]) self.assertTrue(rm.configured_for_recovery("A")) self.assertFalse(rm.configured_for_recovery("B")) - rm.update_config(5, 5, 1, 11, True, False, False, "A") + rm.update_config(5, 5, 1, 11, True, False, False, [ + {'component_name': 'A', 'service_name': 'A', 'desired_state': 'INSTALLED'} + ]) self.assertTrue(rm.configured_for_recovery("A")) self.assertFalse(rm.configured_for_recovery("B")) self.assertFalse(rm.configured_for_recovery("C")) - rm.update_config(5, 5, 1, 11, True, False, False, "A, D, F ") + rm.update_config(5, 5, 1, 11, True, False, False, [ + {'component_name': 'A', 'service_name': 'A', 'desired_state': 'INSTALLED'}, + {'component_name': 'D', 'service_name': 'D', 'desired_state': 'INSTALLED'}, + {'component_name': 'F', 'service_name': 'F', 'desired_state': 'INSTALLED'} + ]) self.assertTrue(rm.configured_for_recovery("A")) self.assertFalse(rm.configured_for_recovery("B")) self.assertFalse(rm.configured_for_recovery("C")) @@ -459,7 +480,7 @@ class _TestRecoveryManager(TestCase): def test_reset_if_window_passed_since_last_attempt(self, time_mock): time_mock.side_effect = \ [1000, 1071, 1372] - rm = RecoveryManager(tempfile.mktemp(), True) + rm = RecoveryManager(True) rm.update_config(2, 5, 1, 4, True, True, False, "") @@ -478,7 +499,7 @@ class _TestRecoveryManager(TestCase): @patch.object(RecoveryManager, "_now_") def test_is_action_info_stale(self, time_mock): - rm = RecoveryManager(tempfile.mktemp(), True) + rm = RecoveryManager(True) rm.update_config(5, 60, 5, 16, True, False, False, "") time_mock.return_value = 0 diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java index 8e2078d..9e3e455 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java @@ -18,8 +18,10 @@ package org.apache.ambari.server.agent; -import com.google.gson.annotations.SerializedName; +import java.util.List; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.gson.annotations.SerializedName; /** * Recovery config to be sent to the agent @@ -33,34 +35,34 @@ public class RecoveryConfig { } @SerializedName("type") - @com.fasterxml.jackson.annotation.JsonProperty("type") + @JsonProperty("type") private String type; @SerializedName("maxCount") - @com.fasterxml.jackson.annotation.JsonProperty("maxCount") + @JsonProperty("maxCount") private String maxCount; @SerializedName("windowInMinutes") - @com.fasterxml.jackson.annotation.JsonProperty("windowInMinutes") + @JsonProperty("windowInMinutes") private String windowInMinutes; @SerializedName("retryGap") - @com.fasterxml.jackson.annotation.JsonProperty("retryGap") + @JsonProperty("retryGap") private String retryGap; @SerializedName("maxLifetimeCount") - @com.fasterxml.jackson.annotation.JsonProperty("maxLifetimeCount") + @JsonProperty("maxLifetimeCount") private String maxLifetimeCount; @SerializedName("components") - @com.fasterxml.jackson.annotation.JsonProperty("components") - private String enabledComponents; + @JsonProperty("components") + private List<RecoveryConfigComponent> enabledComponents; - public String getEnabledComponents() { + public List<RecoveryConfigComponent> getEnabledComponents() { return enabledComponents; } - public void setEnabledComponents(String enabledComponents) { + public void setEnabledComponents(List<RecoveryConfigComponent> enabledComponents) { this.enabledComponents = enabledComponents; } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigComponent.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigComponent.java new file mode 100644 index 0000000..50f13b4 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigComponent.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.agent; + +import java.util.Objects; + +import org.apache.ambari.server.state.ServiceComponentHost; +import org.apache.ambari.server.state.State; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.gson.annotations.SerializedName; + +/** + * Holder for component + */ +public class RecoveryConfigComponent{ + + @SerializedName("component_name") + @JsonProperty("component_name") + private String componentName; + + @SerializedName("service_name") + @JsonProperty("service_name") + private String serviceName; + + @SerializedName("desired_state") + @JsonProperty("desired_state") + private String desiredState; + + /** + * Creates new instance of {@link RecoveryConfigComponent} + * @param componentName name of the component + * @param desiredState desired desiredState of the component + */ + public RecoveryConfigComponent(String componentName, String serviceName, State desiredState){ + this.setComponentName(componentName); + this.setServiceName(serviceName); + this.setDesiredState(desiredState); + } + + /** + * Creates {@link RecoveryConfigComponent} instance from initialized {@link ServiceComponentHost} + */ + public RecoveryConfigComponent(ServiceComponentHost sch) { + this(sch.getServiceComponentName(), sch.getServiceName(), sch.getDesiredState()); + } + + public String getComponentName() { + return componentName; + } + + public void setComponentName(String componentName) { + this.componentName = componentName; + } + + public State getDesiredState() { + return State.valueOf(desiredState); + } + + + public void setDesiredState(State state) { + this.desiredState = state.toString(); + } + + @Override + public String toString(){ + StringBuilder sb = new StringBuilder("{") + .append("componentName=").append(componentName) + .append(", serviceName=").append(serviceName) + .append(", desiredState=").append(desiredState) + .append("}"); + return sb.toString(); + } + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + @Override + public boolean equals(Object o){ + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final RecoveryConfigComponent that = (RecoveryConfigComponent) o; + return Objects.equals(componentName, that.componentName) && + Objects.equals(serviceName, that.serviceName) && + Objects.equals(desiredState, that.desiredState); + } + + @Override + public int hashCode(){ + return Objects.hash(componentName, serviceName, desiredState); + } +} diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java index 668b65e..fbd8a7f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java @@ -108,7 +108,7 @@ public class RecoveryConfigHelper { recoveryConfig.setType(autoStartConfig.getNodeRecoveryType()); recoveryConfig.setWindowInMinutes(autoStartConfig.getNodeRecoveryWindowInMin()); if (autoStartConfig.isRecoveryEnabled()) { - recoveryConfig.setEnabledComponents(StringUtils.join(autoStartConfig.getEnabledComponents(hostname), ',')); + recoveryConfig.setEnabledComponents(autoStartConfig.getEnabledComponents(hostname)); } return recoveryConfig; @@ -316,8 +316,8 @@ public class RecoveryConfigHelper { * maintenance mode. * @return */ - private List<String> getEnabledComponents(String hostname) throws AmbariException { - List<String> enabledComponents = new ArrayList<>(); + private List<RecoveryConfigComponent> getEnabledComponents(String hostname) throws AmbariException { + List<RecoveryConfigComponent> enabledComponents = new ArrayList<>(); if (cluster == null) { return enabledComponents; @@ -343,7 +343,7 @@ public class RecoveryConfigHelper { if (service.getMaintenanceState() == MaintenanceState.OFF) { // Keep the components that are not in maintenance mode. if (sch.getMaintenanceState() == MaintenanceState.OFF) { - enabledComponents.add(sch.getServiceComponentName()); + enabledComponents.add(new RecoveryConfigComponent(sch)); } } } diff --git a/ambari-server/src/test/java/org/apache/ambari/server/configuration/RecoveryConfigHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/configuration/RecoveryConfigHelperTest.java index fb53b1f..738e06e 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/configuration/RecoveryConfigHelperTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/configuration/RecoveryConfigHelperTest.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -35,6 +36,7 @@ import java.util.Set; import org.apache.ambari.server.H2DatabaseCleaner; import org.apache.ambari.server.agent.HeartbeatTestHelper; import org.apache.ambari.server.agent.RecoveryConfig; +import org.apache.ambari.server.agent.RecoveryConfigComponent; import org.apache.ambari.server.agent.RecoveryConfigHelper; import org.apache.ambari.server.controller.internal.DeleteHostComponentStatusMetaData; import org.apache.ambari.server.orm.GuiceJpaInitializer; @@ -53,6 +55,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.eventbus.EventBus; import com.google.inject.Guice; @@ -145,8 +148,7 @@ public class RecoveryConfigHelperTest { * @throws Exception */ @Test - public void testServiceComponentInstalled() - throws Exception { + public void testServiceComponentInstalled() throws Exception { Cluster cluster = heartbeatTestHelper.getDummyCluster(); RepositoryVersionEntity repositoryVersion = helper.getOrCreateRepositoryVersion(cluster); @@ -157,7 +159,9 @@ public class RecoveryConfigHelperTest { // Get the recovery configuration RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1); - assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE"); + assertEquals(Lists.newArrayList( + new RecoveryConfigComponent(DATANODE, HDFS, State.INIT) + ), recoveryConfig.getEnabledComponents()); // Install HDFS::NAMENODE to trigger a component installed event hdfs.addServiceComponent(NAMENODE).setRecoveryEnabled(true); @@ -165,7 +169,10 @@ public class RecoveryConfigHelperTest { // Verify the new config recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1); - assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE,NAMENODE"); + assertEquals(Lists.newArrayList( + new RecoveryConfigComponent(DATANODE, HDFS, State.INIT), + new RecoveryConfigComponent(NAMENODE, HDFS, State.INIT) + ), recoveryConfig.getEnabledComponents()); } /** @@ -188,14 +195,19 @@ public class RecoveryConfigHelperTest { // Get the recovery configuration RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1); - assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE,NAMENODE"); + assertEquals(Lists.newArrayList( + new RecoveryConfigComponent(DATANODE, HDFS, State.INIT), + new RecoveryConfigComponent(NAMENODE, HDFS, State.INIT) + ), recoveryConfig.getEnabledComponents()); // Uninstall HDFS::DATANODE from host1 hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).delete(new DeleteHostComponentStatusMetaData()); // Verify the new config recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1); - assertEquals(recoveryConfig.getEnabledComponents(), "NAMENODE"); + assertEquals(Lists.newArrayList( + new RecoveryConfigComponent(NAMENODE, HDFS, State.INIT) + ), recoveryConfig.getEnabledComponents()); } /** @@ -216,7 +228,9 @@ public class RecoveryConfigHelperTest { // Get the recovery configuration RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1); - assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE"); + assertEquals(Lists.newArrayList( + new RecoveryConfigComponent(DATANODE, HDFS, State.INSTALLED) + ), recoveryConfig.getEnabledComponents()); // Get cluser-env config and turn off recovery for the cluster Config config = cluster.getDesiredConfigByType("cluster-env"); @@ -238,8 +252,7 @@ public class RecoveryConfigHelperTest { * @throws Exception */ @Test - public void testMaintenanceModeChanged() - throws Exception { + public void testMaintenanceModeChanged() throws Exception { Cluster cluster = heartbeatTestHelper.getDummyCluster(); RepositoryVersionEntity repositoryVersion = helper.getOrCreateRepositoryVersion(cluster); Service hdfs = cluster.addService(HDFS, repositoryVersion); @@ -252,13 +265,18 @@ public class RecoveryConfigHelperTest { // Get the recovery configuration RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1); - assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE,NAMENODE"); + assertEquals(Lists.newArrayList( + new RecoveryConfigComponent(DATANODE, HDFS, State.INIT), + new RecoveryConfigComponent(NAMENODE, HDFS, State.INIT) + ), recoveryConfig.getEnabledComponents()); hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).setMaintenanceState(MaintenanceState.ON); // Only NAMENODE is left recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1); - assertEquals(recoveryConfig.getEnabledComponents(), "NAMENODE"); + assertEquals(Lists.newArrayList( + new RecoveryConfigComponent(NAMENODE, HDFS, State.INIT) + ), recoveryConfig.getEnabledComponents()); } /** @@ -267,8 +285,7 @@ public class RecoveryConfigHelperTest { * @throws Exception */ @Test - public void testServiceComponentRecoveryChanged() - throws Exception { + public void testServiceComponentRecoveryChanged() throws Exception { Cluster cluster = heartbeatTestHelper.getDummyCluster(); RepositoryVersionEntity repositoryVersion = helper.getOrCreateRepositoryVersion(cluster); Service hdfs = cluster.addService(HDFS, repositoryVersion); @@ -278,14 +295,16 @@ public class RecoveryConfigHelperTest { // Get the recovery configuration RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1); - assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE"); + assertEquals(Lists.newArrayList( + new RecoveryConfigComponent(DATANODE, HDFS, State.INIT) + ), recoveryConfig.getEnabledComponents()); // Turn off auto start for HDFS::DATANODE hdfs.getServiceComponent(DATANODE).setRecoveryEnabled(false); // Get the latest config. DATANODE should not be present. recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1); - assertEquals(recoveryConfig.getEnabledComponents(), ""); + assertEquals(new ArrayList<RecoveryConfigComponent>(), recoveryConfig.getEnabledComponents()); } /** @@ -319,7 +338,9 @@ public class RecoveryConfigHelperTest { // Simulate registration for Host1: Get the recovery configuration right away for Host1. // It makes an entry for cluster name and Host1 in the timestamp dictionary. RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), "Host1"); - assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE"); + assertEquals(Lists.newArrayList( + new RecoveryConfigComponent(DATANODE, HDFS, State.INIT) + ), recoveryConfig.getEnabledComponents()); // Simulate heartbeat for Host2: When second host heartbeats, it first checks if config stale. // This should return true since it did not get the configuration during registration. -- To stop receiving notification emails like this one, please contact hapyles...@apache.org.