This is an automated email from the ASF dual-hosted git repository. aonishuk pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push: new 2c57b45 AMBARI-23894. ZooKeepers Show As Down After EU to HDP 3.0 But They Are Not (aonishuk) 2c57b45 is described below commit 2c57b457caee50e67c747dfd4ae6d5a2d83e9dac Author: Andrew Onishuk <aonis...@hortonworks.com> AuthorDate: Fri May 18 13:31:02 2018 +0300 AMBARI-23894. ZooKeepers Show As Down After EU to HDP 3.0 But They Are Not (aonishuk) --- .../src/main/python/ambari_agent/ActionQueue.py | 9 +++ .../python/ambari_agent/ComponentStatusExecutor.py | 77 +++++++++++++--------- .../main/python/ambari_agent/HeartbeatThread.py | 4 +- .../main/python/ambari_agent/InitializerModule.py | 19 ++++++ ambari-agent/src/main/python/ambari_agent/main.py | 39 +++-------- .../resource_management/core/providers/accounts.py | 14 +++- 6 files changed, 100 insertions(+), 62 deletions(-) diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 4ac5d67..65239ed 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -83,6 +83,7 @@ class ActionQueue(threading.Thread): self.tmpdir = self.config.get('agent', 'prefix') self.customServiceOrchestrator = initializer_module.customServiceOrchestrator self.parallel_execution = self.config.get_parallel_exec_option() + self.component_status_executor = initializer_module.component_status_executor if self.parallel_execution == 1: logger.info("Parallel execution is enabled, will execute agent commands in parallel") self.lock = threading.Lock() @@ -421,6 +422,14 @@ class ActionQueue(threading.Thread): self.recovery_manager.process_execution_command_result(command, status) self.commandStatuses.put_command_status(command, roleResult) + cluster_id = str(command['clusterId']) + + if cluster_id != '-1' and cluster_id != 'null': + service_name = command['serviceName'] + if service_name != 'null': + component_name = command['role'] + self.component_status_executor.check_component_status(clusterId, service_name, component_name, "STATUS", report=True) + def log_command_output(self, text, taskId): """ Logs a message as multiple enumerated log messages every of which is not larger than MAX_SYMBOLS_PER_LOG_MESSAGE. diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py index e2c73bd..c9f86da 100644 --- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py @@ -99,39 +99,10 @@ class ComponentStatusExecutor(threading.Thread): logger.info("Skipping status command for {0}. Since command for it is running".format(component_name)) continue - command_dict = { - 'serviceName': service_name, - 'role': component_name, - 'clusterId': cluster_id, - 'commandType': 'STATUS_COMMAND', - } + result = self.check_component_status(cluster_id, service_name, component_name, command_name) - component_status_result = self.customServiceOrchestrator.requestComponentStatus(command_dict) - status = LiveStatus.LIVE_STATUS if component_status_result['exitcode'] == 0 else LiveStatus.DEAD_STATUS - - # if exec command for component started to run after status command completion - if self.customServiceOrchestrator.commandsRunningForComponent(cluster_id, component_name): - logger.info("Skipped status command result for {0}. Since command for it is running".format(component_name)) - continue - - # log if status command failed - if status == LiveStatus.DEAD_STATUS: - stderr = component_status_result['stderr'] - if not "ComponentIsNotRunning" in stderr and not "ClientComponentHasNoStatus" in stderr: - logger.info("Status command for {0} failed:\n{1}".format(component_name, stderr)) - - result = { - 'serviceName': service_name, - 'componentName': component_name, - 'command': command_name, - 'status': status, - 'clusterId': cluster_id, - } - - if status != self.reported_component_status[cluster_id][component_name][command_name]: - logging.info("Status for {0} has changed to {1}".format(component_name, status)) + if result: cluster_reports[cluster_id].append(result) - self.recovery_manager.handle_status_change(component_name, status) self.send_updates_to_server(cluster_reports) except ConnectionIsAlreadyClosed: # server and agent disconnected during sending data. Not an issue @@ -142,6 +113,50 @@ class ComponentStatusExecutor(threading.Thread): self.stop_event.wait(self.status_commands_run_interval) logger.info("ComponentStatusExecutor has successfully finished") + def check_component_status(self, cluster_id, service_name, component_name, command_name, report=False): + """ + Returns components status if it has changed, otherwise None. + """ + + # if not a component + if self.topology_cache.get_component_info_by_key(cluster_id, service_name, component_name) is None: + return None + + command_dict = { + 'serviceName': service_name, + 'role': component_name, + 'clusterId': cluster_id, + 'commandType': 'STATUS_COMMAND', + } + + component_status_result = self.customServiceOrchestrator.requestComponentStatus(command_dict) + status = LiveStatus.LIVE_STATUS if component_status_result['exitcode'] == 0 else LiveStatus.DEAD_STATUS + + # log if status command failed + if status == LiveStatus.DEAD_STATUS: + stderr = component_status_result['stderr'] + if not "ComponentIsNotRunning" in stderr and not "ClientComponentHasNoStatus" in stderr: + logger.info("Status command for {0} failed:\n{1}".format(component_name, stderr)) + + result = { + 'serviceName': service_name, + 'componentName': component_name, + 'command': command_name, + 'status': status, + 'clusterId': cluster_id, + } + + if status != self.reported_component_status[cluster_id][component_name][command_name]: + logging.info("Status for {0} has changed to {1}".format(component_name, status)) + self.recovery_manager.handle_status_change(component_name, status) + + if report: + self.send_updates_to_server({cluster_id: [result]}) + + return result + + return None + def send_updates_to_server(self, cluster_reports): if not cluster_reports or not self.initializer_module.is_registered: return diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py index 01bf3c5..179cf6b 100644 --- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py +++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py @@ -78,7 +78,9 @@ class HeartbeatThread(threading.Thread): self.responseId = 0 self.file_cache = initializer_module.file_cache self.stale_alerts_monitor = initializer_module.stale_alerts_monitor - self.post_registration_actions = [self.file_cache.reset] + self.post_registration_actions = [self.file_cache.reset, initializer_module.component_status_executor.clean_not_existing_clusters_info, + initializer_module.alert_status_reporter.clean_not_existing_clusters_info, initializer_module.host_status_reporter.clean_cache] + def run(self): diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py index 61c1e65..052e8c1 100644 --- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py +++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py @@ -38,6 +38,12 @@ from ambari_agent.StaleAlertsMonitor import StaleAlertsMonitor from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListener +from ambari_agent import HeartbeatThread +from ambari_agent.ComponentStatusExecutor import ComponentStatusExecutor +from ambari_agent.CommandStatusReporter import CommandStatusReporter +from ambari_agent.HostStatusReporter import HostStatusReporter +from ambari_agent.AlertStatusReporter import AlertStatusReporter + logger = logging.getLogger(__name__) class InitializerModule: @@ -74,8 +80,21 @@ class InitializerModule: self.recovery_manager = RecoveryManager(self.config.recovery_cache_dir) self.commandStatuses = CommandStatusDict(self) + + self.init_threads() + + + def init_threads(self): + """ + Initialize thread objects + """ + self.component_status_executor = ComponentStatusExecutor(self) self.action_queue = ActionQueue(self) self.alert_scheduler_handler = AlertSchedulerHandler(self) + self.command_status_reporter = CommandStatusReporter(self) + self.host_status_reporter = HostStatusReporter(self) + self.alert_status_reporter = AlertStatusReporter(self) + self.heartbeat_thread = HeartbeatThread.HeartbeatThread(self) @property def connection(self): diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py index 693b04e..b7b9042 100644 --- a/ambari-agent/src/main/python/ambari_agent/main.py +++ b/ambari-agent/src/main/python/ambari_agent/main.py @@ -106,12 +106,7 @@ from resource_management.core.logger import Logger #from resource_management.core.resources.system import File #from resource_management.core.environment import Environment -from ambari_agent import HeartbeatThread from ambari_agent.InitializerModule import InitializerModule -from ambari_agent.ComponentStatusExecutor import ComponentStatusExecutor -from ambari_agent.CommandStatusReporter import CommandStatusReporter -from ambari_agent.HostStatusReporter import HostStatusReporter -from ambari_agent.AlertStatusReporter import AlertStatusReporter #logging.getLogger('ambari_agent').propagate = False @@ -360,25 +355,11 @@ MAX_RETRIES = 10 def run_threads(initializer_module): initializer_module.alert_scheduler_handler.start() - - heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module) - heartbeat_thread.start() - - component_status_executor = ComponentStatusExecutor(initializer_module) - component_status_executor.start() - - command_status_reporter = CommandStatusReporter(initializer_module) - command_status_reporter.start() - - host_status_reporter = HostStatusReporter(initializer_module) - host_status_reporter.start() - - alert_status_reporter = AlertStatusReporter(initializer_module) - alert_status_reporter.start() - - # clean caches for non-existing clusters (ambari-server reset case) - heartbeat_thread.post_registration_actions += [component_status_executor.clean_not_existing_clusters_info, alert_status_reporter.clean_not_existing_clusters_info, host_status_reporter.clean_cache] - + initializer_module.heartbeat_thread.start() + initializer_module.component_status_executor.start() + initializer_module.command_status_reporter.start() + initializer_module.host_status_reporter.start() + initializer_module.alert_status_reporter.start() initializer_module.action_queue.start() while not initializer_module.stop_event.is_set(): @@ -386,11 +367,11 @@ def run_threads(initializer_module): initializer_module.action_queue.interrupt() - command_status_reporter.join() - component_status_executor.join() - host_status_reporter.join() - alert_status_reporter.join() - heartbeat_thread.join() + initializer_module.command_status_reporter.join() + initializer_module.component_status_executor.join() + initializer_module.host_status_reporter.join() + initializer_module.alert_status_reporter.join() + initializer_module.heartbeat_thread.join() initializer_module.action_queue.join() # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process diff --git a/ambari-common/src/main/python/resource_management/core/providers/accounts.py b/ambari-common/src/main/python/resource_management/core/providers/accounts.py index fa70989..990169d 100644 --- a/ambari-common/src/main/python/resource_management/core/providers/accounts.py +++ b/ambari-common/src/main/python/resource_management/core/providers/accounts.py @@ -28,8 +28,11 @@ from resource_management.core import shell from resource_management.core.providers import Provider from resource_management.core.logger import Logger from resource_management.core.utils import lazy_property +from resource_management.core.exceptions import ExecutionFailed class UserProvider(Provider): + USERADD_USER_ALREADY_EXISTS_EXITCODE = 9 + options = dict( comment=(lambda self: self.user.pw_gecos, "-c"), gid=(lambda self: grp.getgrgid(self.user.pw_gid).gr_name, "-g"), @@ -42,9 +45,11 @@ class UserProvider(Provider): def action_create(self): if not self.user: + creating_user = True command = ['useradd', "-m"] Logger.info("Adding user %s" % self.resource) else: + creating_user = False command = ['usermod'] for option_name, attributes in self.options.iteritems(): @@ -81,7 +86,14 @@ class UserProvider(Provider): command.append(self.resource.username) - shell.checked_call(command, sudo=True) + try: + shell.checked_call(command, sudo=True) + except ExecutionFailed as ex: + # this "user already exists" can happen due to race condition when multiple processes create user at the same time + if creating_user and ex.code == UserProvider.USERADD_USER_ALREADY_EXISTS_EXITCODE and self.user: + self.action_create() # run modification of the user + else: + raise def action_remove(self): if self.user: -- To stop receiving notification emails like this one, please contact aonis...@apache.org.