[23/50] ambari git commit: AMBARI-20323. Commands timed-out on ambari host without any error logs (echekanskiy)
AMBARI-20323. Commands timed-out on ambari host without any error logs (echekanskiy) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/da0b7ad8 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/da0b7ad8 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/da0b7ad8 Branch: refs/heads/branch-dev-logsearch Commit: da0b7ad8f8974dfdeaf0e99d86339c7562cdd9f2 Parents: 8ce47e0 Author: Eugene ChekanskiyAuthored: Thu Mar 9 18:31:49 2017 +0200 Committer: Eugene Chekanskiy Committed: Thu Mar 9 18:31:49 2017 +0200 -- .../src/main/python/ambari_agent/ActionQueue.py | 52 +--- .../src/main/python/ambari_agent/Controller.py | 54 +--- .../ambari_agent/StatusCommandsExecutor.py | 307 +++ .../src/main/python/ambari_agent/main.py| 12 +- .../test/python/ambari_agent/TestActionQueue.py | 4 +- .../test/python/ambari_agent/TestController.py | 3 - .../src/test/python/ambari_agent/TestMain.py| 9 +- 7 files changed, 280 insertions(+), 161 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ambari/blob/da0b7ad8/ambari-agent/src/main/python/ambari_agent/ActionQueue.py -- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 5300b52..15ae03d 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -76,10 +76,6 @@ class ActionQueue(threading.Thread): def __init__(self, config, controller): super(ActionQueue, self).__init__() self.commandQueue = Queue.Queue() -self.statusCommandQueue = None # the queue this field points to is re-created whenever - # a new StatusCommandExecutor child process is spawned - # by Controller -# multiprocessing.Queue() self.statusCommandResultQueue = multiprocessing.Queue() # this queue is filled by StatuCommandsExecutor. self.backgroundCommandQueue = Queue.Queue() self.commandStatuses = CommandStatusDict(callback_action = @@ -102,25 +98,7 @@ class ActionQueue(threading.Thread): return self._stop.isSet() def put_status(self, commands): -if not self.statusCommandQueue.empty(): - #Clear all status commands. Was supposed that we got all set of statuses, we don't need to keep old ones - statusCommandQueueSize = 0 - try: -while not self.statusCommandQueue.empty(): - self.statusCommandQueue.get(False) - statusCommandQueueSize = statusCommandQueueSize + 1 - except Queue.Empty: -pass - - logger.info("Number of status commands removed from queue : " + str(statusCommandQueueSize)) - -for command in commands: - logger.info("Adding " + command['commandType'] + " for component " + \ - command['componentName'] + " of service " + \ - command['serviceName'] + " of cluster " + \ - command['clusterName'] + " to the queue.") - self.statusCommandQueue.put(command) - logger.debug(pprint.pformat(command)) +self.controller.statusCommandsExecutor.put_commands(commands) def put(self, commands): for command in commands: @@ -167,8 +145,8 @@ class ActionQueue(threading.Thread): def run(self): try: while not self.stopped(): -self.processBackgroundQueueSafeEmpty(); -self.processStatusCommandResultQueueSafeEmpty(); +self.processBackgroundQueueSafeEmpty() +self.process_status_command_results() try: if self.parallel_execution == 0: command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME) @@ -212,23 +190,13 @@ class ActionQueue(threading.Thread): except Queue.Empty: pass - def processStatusCommandResultQueueSafeEmpty(self): -try: - while not self.statusCommandResultQueue.empty(): -try: - result = self.statusCommandResultQueue.get(False) - self.process_status_command_result(result) -except Queue.Empty: - pass -except IOError: - # on race condition in multiprocessing.Queue if get/put and thread kill are executed at the same time. - # During queue.close IOError will be thrown (this prevents from permanently dead-locked get). - pass -except UnicodeDecodeError: - pass -except IOError: - # queue.empty() may also throw IOError - pass + def process_status_command_results(self): +self.controller.statusCommandsExecutor.process_logs() +for result in
[02/16] ambari git commit: AMBARI-20323. Commands timed-out on ambari host without any error logs (echekanskiy)
AMBARI-20323. Commands timed-out on ambari host without any error logs (echekanskiy) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/17ef5559 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/17ef5559 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/17ef5559 Branch: refs/heads/branch-dev-logsearch Commit: 17ef555940758b73cd09ddcc9fc8a3461604c085 Parents: c6a9a3c Author: Eugene ChekanskiyAuthored: Thu Mar 9 18:30:20 2017 +0200 Committer: Eugene Chekanskiy Committed: Thu Mar 9 18:30:20 2017 +0200 -- .../src/main/python/ambari_agent/ActionQueue.py | 52 +--- .../src/main/python/ambari_agent/Controller.py | 54 +--- .../ambari_agent/StatusCommandsExecutor.py | 307 +++ .../src/main/python/ambari_agent/main.py| 12 +- .../test/python/ambari_agent/TestActionQueue.py | 4 +- .../test/python/ambari_agent/TestController.py | 3 - .../src/test/python/ambari_agent/TestMain.py| 9 +- 7 files changed, 280 insertions(+), 161 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ambari/blob/17ef5559/ambari-agent/src/main/python/ambari_agent/ActionQueue.py -- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 5300b52..15ae03d 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -76,10 +76,6 @@ class ActionQueue(threading.Thread): def __init__(self, config, controller): super(ActionQueue, self).__init__() self.commandQueue = Queue.Queue() -self.statusCommandQueue = None # the queue this field points to is re-created whenever - # a new StatusCommandExecutor child process is spawned - # by Controller -# multiprocessing.Queue() self.statusCommandResultQueue = multiprocessing.Queue() # this queue is filled by StatuCommandsExecutor. self.backgroundCommandQueue = Queue.Queue() self.commandStatuses = CommandStatusDict(callback_action = @@ -102,25 +98,7 @@ class ActionQueue(threading.Thread): return self._stop.isSet() def put_status(self, commands): -if not self.statusCommandQueue.empty(): - #Clear all status commands. Was supposed that we got all set of statuses, we don't need to keep old ones - statusCommandQueueSize = 0 - try: -while not self.statusCommandQueue.empty(): - self.statusCommandQueue.get(False) - statusCommandQueueSize = statusCommandQueueSize + 1 - except Queue.Empty: -pass - - logger.info("Number of status commands removed from queue : " + str(statusCommandQueueSize)) - -for command in commands: - logger.info("Adding " + command['commandType'] + " for component " + \ - command['componentName'] + " of service " + \ - command['serviceName'] + " of cluster " + \ - command['clusterName'] + " to the queue.") - self.statusCommandQueue.put(command) - logger.debug(pprint.pformat(command)) +self.controller.statusCommandsExecutor.put_commands(commands) def put(self, commands): for command in commands: @@ -167,8 +145,8 @@ class ActionQueue(threading.Thread): def run(self): try: while not self.stopped(): -self.processBackgroundQueueSafeEmpty(); -self.processStatusCommandResultQueueSafeEmpty(); +self.processBackgroundQueueSafeEmpty() +self.process_status_command_results() try: if self.parallel_execution == 0: command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME) @@ -212,23 +190,13 @@ class ActionQueue(threading.Thread): except Queue.Empty: pass - def processStatusCommandResultQueueSafeEmpty(self): -try: - while not self.statusCommandResultQueue.empty(): -try: - result = self.statusCommandResultQueue.get(False) - self.process_status_command_result(result) -except Queue.Empty: - pass -except IOError: - # on race condition in multiprocessing.Queue if get/put and thread kill are executed at the same time. - # During queue.close IOError will be thrown (this prevents from permanently dead-locked get). - pass -except UnicodeDecodeError: - pass -except IOError: - # queue.empty() may also throw IOError - pass + def process_status_command_results(self): +self.controller.statusCommandsExecutor.process_logs() +for result in
ambari git commit: AMBARI-20323. Commands timed-out on ambari host without any error logs (echekanskiy)
Repository: ambari Updated Branches: refs/heads/branch-2.5 c6a9a3ca4 -> 17ef55594 AMBARI-20323. Commands timed-out on ambari host without any error logs (echekanskiy) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/17ef5559 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/17ef5559 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/17ef5559 Branch: refs/heads/branch-2.5 Commit: 17ef555940758b73cd09ddcc9fc8a3461604c085 Parents: c6a9a3c Author: Eugene ChekanskiyAuthored: Thu Mar 9 18:30:20 2017 +0200 Committer: Eugene Chekanskiy Committed: Thu Mar 9 18:30:20 2017 +0200 -- .../src/main/python/ambari_agent/ActionQueue.py | 52 +--- .../src/main/python/ambari_agent/Controller.py | 54 +--- .../ambari_agent/StatusCommandsExecutor.py | 307 +++ .../src/main/python/ambari_agent/main.py| 12 +- .../test/python/ambari_agent/TestActionQueue.py | 4 +- .../test/python/ambari_agent/TestController.py | 3 - .../src/test/python/ambari_agent/TestMain.py| 9 +- 7 files changed, 280 insertions(+), 161 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ambari/blob/17ef5559/ambari-agent/src/main/python/ambari_agent/ActionQueue.py -- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 5300b52..15ae03d 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -76,10 +76,6 @@ class ActionQueue(threading.Thread): def __init__(self, config, controller): super(ActionQueue, self).__init__() self.commandQueue = Queue.Queue() -self.statusCommandQueue = None # the queue this field points to is re-created whenever - # a new StatusCommandExecutor child process is spawned - # by Controller -# multiprocessing.Queue() self.statusCommandResultQueue = multiprocessing.Queue() # this queue is filled by StatuCommandsExecutor. self.backgroundCommandQueue = Queue.Queue() self.commandStatuses = CommandStatusDict(callback_action = @@ -102,25 +98,7 @@ class ActionQueue(threading.Thread): return self._stop.isSet() def put_status(self, commands): -if not self.statusCommandQueue.empty(): - #Clear all status commands. Was supposed that we got all set of statuses, we don't need to keep old ones - statusCommandQueueSize = 0 - try: -while not self.statusCommandQueue.empty(): - self.statusCommandQueue.get(False) - statusCommandQueueSize = statusCommandQueueSize + 1 - except Queue.Empty: -pass - - logger.info("Number of status commands removed from queue : " + str(statusCommandQueueSize)) - -for command in commands: - logger.info("Adding " + command['commandType'] + " for component " + \ - command['componentName'] + " of service " + \ - command['serviceName'] + " of cluster " + \ - command['clusterName'] + " to the queue.") - self.statusCommandQueue.put(command) - logger.debug(pprint.pformat(command)) +self.controller.statusCommandsExecutor.put_commands(commands) def put(self, commands): for command in commands: @@ -167,8 +145,8 @@ class ActionQueue(threading.Thread): def run(self): try: while not self.stopped(): -self.processBackgroundQueueSafeEmpty(); -self.processStatusCommandResultQueueSafeEmpty(); +self.processBackgroundQueueSafeEmpty() +self.process_status_command_results() try: if self.parallel_execution == 0: command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME) @@ -212,23 +190,13 @@ class ActionQueue(threading.Thread): except Queue.Empty: pass - def processStatusCommandResultQueueSafeEmpty(self): -try: - while not self.statusCommandResultQueue.empty(): -try: - result = self.statusCommandResultQueue.get(False) - self.process_status_command_result(result) -except Queue.Empty: - pass -except IOError: - # on race condition in multiprocessing.Queue if get/put and thread kill are executed at the same time. - # During queue.close IOError will be thrown (this prevents from permanently dead-locked get). - pass -except UnicodeDecodeError: - pass -except IOError: - # queue.empty() may also throw IOError - pass + def process_status_command_results(self): +
ambari git commit: AMBARI-20323. Commands timed-out on ambari host without any error logs (echekanskiy)
Repository: ambari Updated Branches: refs/heads/trunk 8ce47e0cb -> da0b7ad8f AMBARI-20323. Commands timed-out on ambari host without any error logs (echekanskiy) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/da0b7ad8 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/da0b7ad8 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/da0b7ad8 Branch: refs/heads/trunk Commit: da0b7ad8f8974dfdeaf0e99d86339c7562cdd9f2 Parents: 8ce47e0 Author: Eugene ChekanskiyAuthored: Thu Mar 9 18:31:49 2017 +0200 Committer: Eugene Chekanskiy Committed: Thu Mar 9 18:31:49 2017 +0200 -- .../src/main/python/ambari_agent/ActionQueue.py | 52 +--- .../src/main/python/ambari_agent/Controller.py | 54 +--- .../ambari_agent/StatusCommandsExecutor.py | 307 +++ .../src/main/python/ambari_agent/main.py| 12 +- .../test/python/ambari_agent/TestActionQueue.py | 4 +- .../test/python/ambari_agent/TestController.py | 3 - .../src/test/python/ambari_agent/TestMain.py| 9 +- 7 files changed, 280 insertions(+), 161 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ambari/blob/da0b7ad8/ambari-agent/src/main/python/ambari_agent/ActionQueue.py -- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 5300b52..15ae03d 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -76,10 +76,6 @@ class ActionQueue(threading.Thread): def __init__(self, config, controller): super(ActionQueue, self).__init__() self.commandQueue = Queue.Queue() -self.statusCommandQueue = None # the queue this field points to is re-created whenever - # a new StatusCommandExecutor child process is spawned - # by Controller -# multiprocessing.Queue() self.statusCommandResultQueue = multiprocessing.Queue() # this queue is filled by StatuCommandsExecutor. self.backgroundCommandQueue = Queue.Queue() self.commandStatuses = CommandStatusDict(callback_action = @@ -102,25 +98,7 @@ class ActionQueue(threading.Thread): return self._stop.isSet() def put_status(self, commands): -if not self.statusCommandQueue.empty(): - #Clear all status commands. Was supposed that we got all set of statuses, we don't need to keep old ones - statusCommandQueueSize = 0 - try: -while not self.statusCommandQueue.empty(): - self.statusCommandQueue.get(False) - statusCommandQueueSize = statusCommandQueueSize + 1 - except Queue.Empty: -pass - - logger.info("Number of status commands removed from queue : " + str(statusCommandQueueSize)) - -for command in commands: - logger.info("Adding " + command['commandType'] + " for component " + \ - command['componentName'] + " of service " + \ - command['serviceName'] + " of cluster " + \ - command['clusterName'] + " to the queue.") - self.statusCommandQueue.put(command) - logger.debug(pprint.pformat(command)) +self.controller.statusCommandsExecutor.put_commands(commands) def put(self, commands): for command in commands: @@ -167,8 +145,8 @@ class ActionQueue(threading.Thread): def run(self): try: while not self.stopped(): -self.processBackgroundQueueSafeEmpty(); -self.processStatusCommandResultQueueSafeEmpty(); +self.processBackgroundQueueSafeEmpty() +self.process_status_command_results() try: if self.parallel_execution == 0: command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME) @@ -212,23 +190,13 @@ class ActionQueue(threading.Thread): except Queue.Empty: pass - def processStatusCommandResultQueueSafeEmpty(self): -try: - while not self.statusCommandResultQueue.empty(): -try: - result = self.statusCommandResultQueue.get(False) - self.process_status_command_result(result) -except Queue.Empty: - pass -except IOError: - # on race condition in multiprocessing.Queue if get/put and thread kill are executed at the same time. - # During queue.close IOError will be thrown (this prevents from permanently dead-locked get). - pass -except UnicodeDecodeError: - pass -except IOError: - # queue.empty() may also throw IOError - pass + def process_status_command_results(self): +self.controller.statusCommandsExecutor.process_logs() +