[23/50] ambari git commit: AMBARI-20323. Commands timed-out on ambari host without any error logs (echekanskiy)

2017-03-10 Thread oleewere
AMBARI-20323. Commands timed-out on ambari host without any error logs 
(echekanskiy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/da0b7ad8
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/da0b7ad8
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/da0b7ad8

Branch: refs/heads/branch-dev-logsearch
Commit: da0b7ad8f8974dfdeaf0e99d86339c7562cdd9f2
Parents: 8ce47e0
Author: Eugene Chekanskiy 
Authored: Thu Mar 9 18:31:49 2017 +0200
Committer: Eugene Chekanskiy 
Committed: Thu Mar 9 18:31:49 2017 +0200

--
 .../src/main/python/ambari_agent/ActionQueue.py |  52 +---
 .../src/main/python/ambari_agent/Controller.py  |  54 +---
 .../ambari_agent/StatusCommandsExecutor.py  | 307 +++
 .../src/main/python/ambari_agent/main.py|  12 +-
 .../test/python/ambari_agent/TestActionQueue.py |   4 +-
 .../test/python/ambari_agent/TestController.py  |   3 -
 .../src/test/python/ambari_agent/TestMain.py|   9 +-
 7 files changed, 280 insertions(+), 161 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/ambari/blob/da0b7ad8/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
--
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py 
b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 5300b52..15ae03d 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -76,10 +76,6 @@ class ActionQueue(threading.Thread):
   def __init__(self, config, controller):
 super(ActionQueue, self).__init__()
 self.commandQueue = Queue.Queue()
-self.statusCommandQueue = None # the queue this field points to is 
re-created whenever
-   # a new StatusCommandExecutor child process 
is spawned
-   # by Controller
-# multiprocessing.Queue()
 self.statusCommandResultQueue = multiprocessing.Queue() # this queue is 
filled by StatuCommandsExecutor.
 self.backgroundCommandQueue = Queue.Queue()
 self.commandStatuses = CommandStatusDict(callback_action =
@@ -102,25 +98,7 @@ class ActionQueue(threading.Thread):
 return self._stop.isSet()
 
   def put_status(self, commands):
-if not self.statusCommandQueue.empty():
-  #Clear all status commands. Was supposed that we got all set of 
statuses, we don't need to keep old ones
-  statusCommandQueueSize = 0
-  try:
-while not self.statusCommandQueue.empty():
-  self.statusCommandQueue.get(False)
-  statusCommandQueueSize = statusCommandQueueSize + 1
-  except Queue.Empty:
-pass
-
-  logger.info("Number of status commands removed from queue : " + 
str(statusCommandQueueSize))
-
-for command in commands:
-  logger.info("Adding " + command['commandType'] + " for component " + \
-  command['componentName'] + " of service " + \
-  command['serviceName'] + " of cluster " + \
-  command['clusterName'] + " to the queue.")
-  self.statusCommandQueue.put(command)
-  logger.debug(pprint.pformat(command))
+self.controller.statusCommandsExecutor.put_commands(commands)
 
   def put(self, commands):
 for command in commands:
@@ -167,8 +145,8 @@ class ActionQueue(threading.Thread):
   def run(self):
 try:
   while not self.stopped():
-self.processBackgroundQueueSafeEmpty();
-self.processStatusCommandResultQueueSafeEmpty();
+self.processBackgroundQueueSafeEmpty()
+self.process_status_command_results()
 try:
   if self.parallel_execution == 0:
 command = self.commandQueue.get(True, 
self.EXECUTION_COMMAND_WAIT_TIME)
@@ -212,23 +190,13 @@ class ActionQueue(threading.Thread):
   except Queue.Empty:
 pass
 
-  def processStatusCommandResultQueueSafeEmpty(self):
-try:
-  while not self.statusCommandResultQueue.empty():
-try:
-  result = self.statusCommandResultQueue.get(False)
-  self.process_status_command_result(result)
-except Queue.Empty:
-  pass
-except IOError:
-  # on race condition in multiprocessing.Queue if get/put and thread 
kill are executed at the same time.
-  # During queue.close IOError will be thrown (this prevents from 
permanently dead-locked get).
-  pass
-except UnicodeDecodeError:
-  pass
-except IOError:
-  # queue.empty() may also throw IOError
-  pass
+  def process_status_command_results(self):
+self.controller.statusCommandsExecutor.process_logs()
+for result in 

[02/16] ambari git commit: AMBARI-20323. Commands timed-out on ambari host without any error logs (echekanskiy)

2017-03-10 Thread oleewere
AMBARI-20323. Commands timed-out on ambari host without any error logs 
(echekanskiy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/17ef5559
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/17ef5559
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/17ef5559

Branch: refs/heads/branch-dev-logsearch
Commit: 17ef555940758b73cd09ddcc9fc8a3461604c085
Parents: c6a9a3c
Author: Eugene Chekanskiy 
Authored: Thu Mar 9 18:30:20 2017 +0200
Committer: Eugene Chekanskiy 
Committed: Thu Mar 9 18:30:20 2017 +0200

--
 .../src/main/python/ambari_agent/ActionQueue.py |  52 +---
 .../src/main/python/ambari_agent/Controller.py  |  54 +---
 .../ambari_agent/StatusCommandsExecutor.py  | 307 +++
 .../src/main/python/ambari_agent/main.py|  12 +-
 .../test/python/ambari_agent/TestActionQueue.py |   4 +-
 .../test/python/ambari_agent/TestController.py  |   3 -
 .../src/test/python/ambari_agent/TestMain.py|   9 +-
 7 files changed, 280 insertions(+), 161 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/ambari/blob/17ef5559/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
--
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py 
b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 5300b52..15ae03d 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -76,10 +76,6 @@ class ActionQueue(threading.Thread):
   def __init__(self, config, controller):
 super(ActionQueue, self).__init__()
 self.commandQueue = Queue.Queue()
-self.statusCommandQueue = None # the queue this field points to is 
re-created whenever
-   # a new StatusCommandExecutor child process 
is spawned
-   # by Controller
-# multiprocessing.Queue()
 self.statusCommandResultQueue = multiprocessing.Queue() # this queue is 
filled by StatuCommandsExecutor.
 self.backgroundCommandQueue = Queue.Queue()
 self.commandStatuses = CommandStatusDict(callback_action =
@@ -102,25 +98,7 @@ class ActionQueue(threading.Thread):
 return self._stop.isSet()
 
   def put_status(self, commands):
-if not self.statusCommandQueue.empty():
-  #Clear all status commands. Was supposed that we got all set of 
statuses, we don't need to keep old ones
-  statusCommandQueueSize = 0
-  try:
-while not self.statusCommandQueue.empty():
-  self.statusCommandQueue.get(False)
-  statusCommandQueueSize = statusCommandQueueSize + 1
-  except Queue.Empty:
-pass
-
-  logger.info("Number of status commands removed from queue : " + 
str(statusCommandQueueSize))
-
-for command in commands:
-  logger.info("Adding " + command['commandType'] + " for component " + \
-  command['componentName'] + " of service " + \
-  command['serviceName'] + " of cluster " + \
-  command['clusterName'] + " to the queue.")
-  self.statusCommandQueue.put(command)
-  logger.debug(pprint.pformat(command))
+self.controller.statusCommandsExecutor.put_commands(commands)
 
   def put(self, commands):
 for command in commands:
@@ -167,8 +145,8 @@ class ActionQueue(threading.Thread):
   def run(self):
 try:
   while not self.stopped():
-self.processBackgroundQueueSafeEmpty();
-self.processStatusCommandResultQueueSafeEmpty();
+self.processBackgroundQueueSafeEmpty()
+self.process_status_command_results()
 try:
   if self.parallel_execution == 0:
 command = self.commandQueue.get(True, 
self.EXECUTION_COMMAND_WAIT_TIME)
@@ -212,23 +190,13 @@ class ActionQueue(threading.Thread):
   except Queue.Empty:
 pass
 
-  def processStatusCommandResultQueueSafeEmpty(self):
-try:
-  while not self.statusCommandResultQueue.empty():
-try:
-  result = self.statusCommandResultQueue.get(False)
-  self.process_status_command_result(result)
-except Queue.Empty:
-  pass
-except IOError:
-  # on race condition in multiprocessing.Queue if get/put and thread 
kill are executed at the same time.
-  # During queue.close IOError will be thrown (this prevents from 
permanently dead-locked get).
-  pass
-except UnicodeDecodeError:
-  pass
-except IOError:
-  # queue.empty() may also throw IOError
-  pass
+  def process_status_command_results(self):
+self.controller.statusCommandsExecutor.process_logs()
+for result in 

ambari git commit: AMBARI-20323. Commands timed-out on ambari host without any error logs (echekanskiy)

2017-03-09 Thread echekanskiy
Repository: ambari
Updated Branches:
  refs/heads/branch-2.5 c6a9a3ca4 -> 17ef55594


AMBARI-20323. Commands timed-out on ambari host without any error logs 
(echekanskiy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/17ef5559
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/17ef5559
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/17ef5559

Branch: refs/heads/branch-2.5
Commit: 17ef555940758b73cd09ddcc9fc8a3461604c085
Parents: c6a9a3c
Author: Eugene Chekanskiy 
Authored: Thu Mar 9 18:30:20 2017 +0200
Committer: Eugene Chekanskiy 
Committed: Thu Mar 9 18:30:20 2017 +0200

--
 .../src/main/python/ambari_agent/ActionQueue.py |  52 +---
 .../src/main/python/ambari_agent/Controller.py  |  54 +---
 .../ambari_agent/StatusCommandsExecutor.py  | 307 +++
 .../src/main/python/ambari_agent/main.py|  12 +-
 .../test/python/ambari_agent/TestActionQueue.py |   4 +-
 .../test/python/ambari_agent/TestController.py  |   3 -
 .../src/test/python/ambari_agent/TestMain.py|   9 +-
 7 files changed, 280 insertions(+), 161 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/ambari/blob/17ef5559/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
--
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py 
b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 5300b52..15ae03d 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -76,10 +76,6 @@ class ActionQueue(threading.Thread):
   def __init__(self, config, controller):
 super(ActionQueue, self).__init__()
 self.commandQueue = Queue.Queue()
-self.statusCommandQueue = None # the queue this field points to is 
re-created whenever
-   # a new StatusCommandExecutor child process 
is spawned
-   # by Controller
-# multiprocessing.Queue()
 self.statusCommandResultQueue = multiprocessing.Queue() # this queue is 
filled by StatuCommandsExecutor.
 self.backgroundCommandQueue = Queue.Queue()
 self.commandStatuses = CommandStatusDict(callback_action =
@@ -102,25 +98,7 @@ class ActionQueue(threading.Thread):
 return self._stop.isSet()
 
   def put_status(self, commands):
-if not self.statusCommandQueue.empty():
-  #Clear all status commands. Was supposed that we got all set of 
statuses, we don't need to keep old ones
-  statusCommandQueueSize = 0
-  try:
-while not self.statusCommandQueue.empty():
-  self.statusCommandQueue.get(False)
-  statusCommandQueueSize = statusCommandQueueSize + 1
-  except Queue.Empty:
-pass
-
-  logger.info("Number of status commands removed from queue : " + 
str(statusCommandQueueSize))
-
-for command in commands:
-  logger.info("Adding " + command['commandType'] + " for component " + \
-  command['componentName'] + " of service " + \
-  command['serviceName'] + " of cluster " + \
-  command['clusterName'] + " to the queue.")
-  self.statusCommandQueue.put(command)
-  logger.debug(pprint.pformat(command))
+self.controller.statusCommandsExecutor.put_commands(commands)
 
   def put(self, commands):
 for command in commands:
@@ -167,8 +145,8 @@ class ActionQueue(threading.Thread):
   def run(self):
 try:
   while not self.stopped():
-self.processBackgroundQueueSafeEmpty();
-self.processStatusCommandResultQueueSafeEmpty();
+self.processBackgroundQueueSafeEmpty()
+self.process_status_command_results()
 try:
   if self.parallel_execution == 0:
 command = self.commandQueue.get(True, 
self.EXECUTION_COMMAND_WAIT_TIME)
@@ -212,23 +190,13 @@ class ActionQueue(threading.Thread):
   except Queue.Empty:
 pass
 
-  def processStatusCommandResultQueueSafeEmpty(self):
-try:
-  while not self.statusCommandResultQueue.empty():
-try:
-  result = self.statusCommandResultQueue.get(False)
-  self.process_status_command_result(result)
-except Queue.Empty:
-  pass
-except IOError:
-  # on race condition in multiprocessing.Queue if get/put and thread 
kill are executed at the same time.
-  # During queue.close IOError will be thrown (this prevents from 
permanently dead-locked get).
-  pass
-except UnicodeDecodeError:
-  pass
-except IOError:
-  # queue.empty() may also throw IOError
-  pass
+  def process_status_command_results(self):
+

ambari git commit: AMBARI-20323. Commands timed-out on ambari host without any error logs (echekanskiy)

2017-03-09 Thread echekanskiy
Repository: ambari
Updated Branches:
  refs/heads/trunk 8ce47e0cb -> da0b7ad8f


AMBARI-20323. Commands timed-out on ambari host without any error logs 
(echekanskiy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/da0b7ad8
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/da0b7ad8
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/da0b7ad8

Branch: refs/heads/trunk
Commit: da0b7ad8f8974dfdeaf0e99d86339c7562cdd9f2
Parents: 8ce47e0
Author: Eugene Chekanskiy 
Authored: Thu Mar 9 18:31:49 2017 +0200
Committer: Eugene Chekanskiy 
Committed: Thu Mar 9 18:31:49 2017 +0200

--
 .../src/main/python/ambari_agent/ActionQueue.py |  52 +---
 .../src/main/python/ambari_agent/Controller.py  |  54 +---
 .../ambari_agent/StatusCommandsExecutor.py  | 307 +++
 .../src/main/python/ambari_agent/main.py|  12 +-
 .../test/python/ambari_agent/TestActionQueue.py |   4 +-
 .../test/python/ambari_agent/TestController.py  |   3 -
 .../src/test/python/ambari_agent/TestMain.py|   9 +-
 7 files changed, 280 insertions(+), 161 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/ambari/blob/da0b7ad8/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
--
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py 
b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 5300b52..15ae03d 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -76,10 +76,6 @@ class ActionQueue(threading.Thread):
   def __init__(self, config, controller):
 super(ActionQueue, self).__init__()
 self.commandQueue = Queue.Queue()
-self.statusCommandQueue = None # the queue this field points to is 
re-created whenever
-   # a new StatusCommandExecutor child process 
is spawned
-   # by Controller
-# multiprocessing.Queue()
 self.statusCommandResultQueue = multiprocessing.Queue() # this queue is 
filled by StatuCommandsExecutor.
 self.backgroundCommandQueue = Queue.Queue()
 self.commandStatuses = CommandStatusDict(callback_action =
@@ -102,25 +98,7 @@ class ActionQueue(threading.Thread):
 return self._stop.isSet()
 
   def put_status(self, commands):
-if not self.statusCommandQueue.empty():
-  #Clear all status commands. Was supposed that we got all set of 
statuses, we don't need to keep old ones
-  statusCommandQueueSize = 0
-  try:
-while not self.statusCommandQueue.empty():
-  self.statusCommandQueue.get(False)
-  statusCommandQueueSize = statusCommandQueueSize + 1
-  except Queue.Empty:
-pass
-
-  logger.info("Number of status commands removed from queue : " + 
str(statusCommandQueueSize))
-
-for command in commands:
-  logger.info("Adding " + command['commandType'] + " for component " + \
-  command['componentName'] + " of service " + \
-  command['serviceName'] + " of cluster " + \
-  command['clusterName'] + " to the queue.")
-  self.statusCommandQueue.put(command)
-  logger.debug(pprint.pformat(command))
+self.controller.statusCommandsExecutor.put_commands(commands)
 
   def put(self, commands):
 for command in commands:
@@ -167,8 +145,8 @@ class ActionQueue(threading.Thread):
   def run(self):
 try:
   while not self.stopped():
-self.processBackgroundQueueSafeEmpty();
-self.processStatusCommandResultQueueSafeEmpty();
+self.processBackgroundQueueSafeEmpty()
+self.process_status_command_results()
 try:
   if self.parallel_execution == 0:
 command = self.commandQueue.get(True, 
self.EXECUTION_COMMAND_WAIT_TIME)
@@ -212,23 +190,13 @@ class ActionQueue(threading.Thread):
   except Queue.Empty:
 pass
 
-  def processStatusCommandResultQueueSafeEmpty(self):
-try:
-  while not self.statusCommandResultQueue.empty():
-try:
-  result = self.statusCommandResultQueue.get(False)
-  self.process_status_command_result(result)
-except Queue.Empty:
-  pass
-except IOError:
-  # on race condition in multiprocessing.Queue if get/put and thread 
kill are executed at the same time.
-  # During queue.close IOError will be thrown (this prevents from 
permanently dead-locked get).
-  pass
-except UnicodeDecodeError:
-  pass
-except IOError:
-  # queue.empty() may also throw IOError
-  pass
+  def process_status_command_results(self):
+self.controller.statusCommandsExecutor.process_logs()
+