Repository: ambari
Updated Branches:
  refs/heads/trunk 3dd56eb44 -> b56019031


AMBARI-5280. Storm daemons need to run under supervision (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b5601903
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b5601903
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b5601903

Branch: refs/heads/trunk
Commit: b56019031443e174f0e09ec753259e48b48457b6
Parents: 3dd56eb
Author: Andrew Onischuk <aonis...@hortonworks.com>
Authored: Mon Mar 31 08:23:30 2014 -0700
Committer: Andrew Onischuk <aonis...@hortonworks.com>
Committed: Mon Mar 31 08:23:30 2014 -0700

----------------------------------------------------------------------
 .../services/STORM/package/scripts/nimbus.py    |   1 -
 .../STORM/package/scripts/nimbus_prod.py        |  55 +++++
 .../STORM/package/scripts/supervisor_prod.py    |  57 +++++
 .../package/scripts/supervisord_service.py      |  32 +++
 .../stacks/2.1/STORM/test_storm_nimbus_prod.py  | 163 +++++++++++++++
 .../2.1/STORM/test_storm_supervisor_prod.py     | 208 +++++++++++++++++++
 6 files changed, 515 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/b5601903/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/nimbus.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/nimbus.py
 
b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/nimbus.py
index 4ddc28f..2dd7963 100644
--- 
a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/nimbus.py
+++ 
b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/nimbus.py
@@ -22,7 +22,6 @@ import sys
 from resource_management import *
 from storm import storm
 from service import service
-from service_check import ServiceCheck
 
 
 class Nimbus(Script):

http://git-wip-us.apache.org/repos/asf/ambari/blob/b5601903/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/nimbus_prod.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/nimbus_prod.py
 
b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/nimbus_prod.py
new file mode 100644
index 0000000..a1f8afe
--- /dev/null
+++ 
b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/nimbus_prod.py
@@ -0,0 +1,55 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import sys
+from resource_management import *
+from storm import storm
+from supervisord_service import supervisord_service, supervisord_check_status
+
+
+class Nimbus(Script):
+  def install(self, env):
+    self.install_packages(env)
+    self.configure(env)
+
+  def configure(self, env):
+    import params
+    env.set_params(params)
+
+    storm()
+
+  def start(self, env):
+    import params
+    env.set_params(params)
+    self.configure(env)
+
+    supervisord_service("nimbus", action="start")
+
+  def stop(self, env):
+    import params
+    env.set_params(params)
+
+    supervisord_service("nimbus", action="stop")
+
+  def status(self, env):
+    supervisord_check_status("nimbus")
+
+if __name__ == "__main__":
+  Nimbus().execute()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/b5601903/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/supervisor_prod.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/supervisor_prod.py
 
b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/supervisor_prod.py
new file mode 100644
index 0000000..7aecdc5
--- /dev/null
+++ 
b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/supervisor_prod.py
@@ -0,0 +1,57 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import sys
+from resource_management import *
+from storm import storm
+from service import service
+from supervisord_service import supervisord_service, supervisord_check_status
+
+
+class Supervisor(Script):
+  def install(self, env):
+    self.install_packages(env)
+    self.configure(env)
+
+  def configure(self, env):
+    import params
+    env.set_params(params)
+    storm()
+
+  def start(self, env):
+    import params
+    env.set_params(params)
+    self.configure(env)
+
+    supervisord_service("supervisor", action="start")
+    service("logviewer", action="start")
+
+  def stop(self, env):
+    import params
+    env.set_params(params)
+
+    supervisord_service("supervisor", action="stop")
+    service("logviewer", action="stop")
+
+  def status(self, env):
+    supervisord_check_status("supervisor")
+
+if __name__ == "__main__":
+  Supervisor().execute()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/b5601903/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/supervisord_service.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/supervisord_service.py
 
b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/supervisord_service.py
new file mode 100644
index 0000000..6a5ea0b
--- /dev/null
+++ 
b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/supervisord_service.py
@@ -0,0 +1,32 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management import *
+
+def supervisord_service(component_name, action):
+  Execute(format("supervisorctl {action} storm-{component_name}"),
+    wait_for_finish=False
+  )
+
+def supervisord_check_status(component_name):
+  try:
+    Execute(format("supervisorctl status storm-{component_name} | grep 
RUNNING"))
+  except Fail:
+    raise ComponentIsNotRunning() 

http://git-wip-us.apache.org/repos/asf/ambari/blob/b5601903/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py 
b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py
new file mode 100644
index 0000000..b236083
--- /dev/null
+++ b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py
@@ -0,0 +1,163 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from mock.mock import MagicMock, call, patch
+from stacks.utils.RMFTestCase import *
+import  resource_management.core.source
+
+@patch.object(resource_management.core.source, "InlineTemplate", new = 
MagicMock(return_value='InlineTemplateMock'))
+class TestStormNimbus(RMFTestCase):
+
+  def test_configure_default(self):
+    self.executeScript("2.1/services/STORM/package/scripts/nimbus_prod.py",
+                       classname = "Nimbus",
+                       command = "configure",
+                       config_file="default.json"
+    )
+    self.assert_configure_default()
+    self.assertNoMoreResources()
+
+  def test_start_default(self):
+    self.executeScript("2.1/services/STORM/package/scripts/nimbus_prod.py",
+                       classname = "Nimbus",
+                       command = "start",
+                       config_file="default.json"
+    )
+
+    self.assert_configure_default()
+    self.assertResourceCalled('Execute', 'supervisorctl start storm-nimbus',
+                        wait_for_finish = False,
+    )
+    
+    self.assertNoMoreResources()
+
+  def test_stop_default(self):
+    self.executeScript("2.1/services/STORM/package/scripts/nimbus_prod.py",
+                       classname = "Nimbus",
+                       command = "stop",
+                       config_file="default.json"
+    )
+    self.assertResourceCalled('Execute', 'supervisorctl stop storm-nimbus',
+                              wait_for_finish = False,
+    )
+    self.assertNoMoreResources()
+
+  def test_configure_default(self):
+    self.executeScript("2.1/services/STORM/package/scripts/nimbus_prod.py",
+                       classname = "Nimbus",
+                       command = "configure",
+                       config_file="secured.json"
+    )
+    self.assert_configure_secured()
+    self.assertNoMoreResources()
+
+  def test_start_secured(self):
+    self.executeScript("2.1/services/STORM/package/scripts/nimbus_prod.py",
+                       classname = "Nimbus",
+                       command = "start",
+                       config_file="secured.json"
+    )
+
+    self.assert_configure_secured()
+    self.assertResourceCalled('Execute', 'supervisorctl start storm-nimbus',
+                        wait_for_finish = False,
+    )
+    
+    self.assertNoMoreResources()
+
+  def test_stop_secured(self):
+    self.executeScript("2.1/services/STORM/package/scripts/nimbus_prod.py",
+                       classname = "Nimbus",
+                       command = "stop",
+                       config_file="secured.json"
+    )
+    self.assertResourceCalled('Execute', 'supervisorctl stop storm-nimbus',
+                              wait_for_finish = False,
+    )
+    self.assertNoMoreResources()
+
+  def assert_configure_default(self):
+    self.assertResourceCalled('Directory', '/var/log/storm',
+      owner = 'storm',
+      group = 'hadoop',
+      recursive = True,
+    )    
+    self.assertResourceCalled('Directory', '/var/run/storm',
+      owner = 'storm',
+      group = 'hadoop',
+      recursive = True,
+    )    
+    self.assertResourceCalled('Directory', '/hadoop/storm',
+      owner = 'storm',
+      group = 'hadoop',
+      recursive = True,
+    )    
+    self.assertResourceCalled('Directory', '/etc/storm/conf',
+      owner = 'storm',
+      group = 'hadoop',
+      recursive = True,
+    )    
+    self.assertResourceCalled('File', '/etc/storm/conf/config.yaml',
+      owner = 'storm',
+      content = Template('config.yaml.j2'),
+      group = 'hadoop',
+    )    
+    self.assertResourceCalled('File', '/etc/storm/conf/storm.yaml',
+      owner = 'storm',
+      content = 'InlineTemplateMock',
+      group = 'hadoop',
+      mode = None,
+    )
+
+  def assert_configure_secured(self):
+    self.assertResourceCalled('Directory', '/var/log/storm',
+      owner = 'storm',
+      group = 'hadoop',
+      recursive = True,
+    )    
+    self.assertResourceCalled('Directory', '/var/run/storm',
+      owner = 'storm',
+      group = 'hadoop',
+      recursive = True,
+    )    
+    self.assertResourceCalled('Directory', '/hadoop/storm',
+      owner = 'storm',
+      group = 'hadoop',
+      recursive = True,
+    )    
+    self.assertResourceCalled('Directory', '/etc/storm/conf',
+      owner = 'storm',
+      group = 'hadoop',
+      recursive = True,
+    )    
+    self.assertResourceCalled('File', '/etc/storm/conf/config.yaml',
+      owner = 'storm',
+      content = Template('config.yaml.j2'),
+      group = 'hadoop',
+    )    
+    self.assertResourceCalled('File', '/etc/storm/conf/storm.yaml',
+      owner = 'storm',
+      content = 'InlineTemplateMock',
+      group = 'hadoop',
+      mode = None,
+    )    
+    self.assertResourceCalled('TemplateConfig', 
'/etc/storm/conf/storm_jaas.conf',
+      owner = 'storm',
+    )

http://git-wip-us.apache.org/repos/asf/ambari/blob/b5601903/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py 
b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py
new file mode 100644
index 0000000..91398e7
--- /dev/null
+++ 
b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py
@@ -0,0 +1,208 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from mock.mock import MagicMock, call, patch
+from stacks.utils.RMFTestCase import *
+import  resource_management.core.source
+
+@patch.object(resource_management.core.source, "InlineTemplate", new = 
MagicMock(return_value='InlineTemplateMock'))
+class TestStormSupervisor(RMFTestCase):
+
+  def test_configure_default(self):
+    self.executeScript("2.1/services/STORM/package/scripts/supervisor_prod.py",
+                       classname = "Supervisor",
+                       command = "configure",
+                       config_file="default.json"
+    )
+    self.assert_configure_default()
+    self.assertNoMoreResources()
+
+  def test_start_default(self):
+    self.executeScript("2.1/services/STORM/package/scripts/supervisor_prod.py",
+                       classname = "Supervisor",
+                       command = "start",
+                       config_file="default.json"
+    )
+
+    self.assert_configure_default()
+    
+    self.assertResourceCalled('Execute', 'supervisorctl start 
storm-supervisor',
+      wait_for_finish = False,
+    )
+    self.assertResourceCalled('Execute', 'env JAVA_HOME=/usr/jdk64/jdk1.7.0_45 
PATH=$PATH:/usr/jdk64/jdk1.7.0_45/bin /usr/bin/storm logviewer',
+      wait_for_finish = False,
+      not_if = 'ls /var/run/storm/logviewer.pid >/dev/null 2>&1 && ps `cat 
/var/run/storm/logviewer.pid` >/dev/null 2>&1',
+      user = 'storm',
+    )
+    self.assertResourceCalled('Execute', 'pgrep -f 
"^java.+backtype.storm.daemon.logviewer$" && pgrep -f 
"^java.+backtype.storm.daemon.logviewer$" > /var/run/storm/logviewer.pid',
+      logoutput = True,
+      tries = 6,
+      user = 'storm',
+      try_sleep = 10,
+    )
+    
+    self.assertNoMoreResources()
+
+  def test_stop_default(self):
+    self.executeScript("2.1/services/STORM/package/scripts/supervisor_prod.py",
+                       classname = "Supervisor",
+                       command = "stop",
+                       config_file="default.json"
+    )
+    self.assertResourceCalled('Execute', 'supervisorctl stop storm-supervisor',
+                              wait_for_finish = False,
+    )
+    self.assertResourceCalled('Execute', 'kill `cat 
/var/run/storm/logviewer.pid` >/dev/null 2>&1',
+      not_if = '! (ls /var/run/storm/logviewer.pid >/dev/null 2>&1 && ps `cat 
/var/run/storm/logviewer.pid` >/dev/null 2>&1)',
+    )
+    self.assertResourceCalled('Execute', 'kill -9 `cat 
/var/run/storm/logviewer.pid` >/dev/null 2>&1',
+      not_if = 'sleep 2; ! (ls /var/run/storm/logviewer.pid >/dev/null 2>&1 && 
ps `cat /var/run/storm/logviewer.pid` >/dev/null 2>&1) || sleep 20; ! (ls 
/var/run/storm/logviewer.pid >/dev/null 2>&1 && ps `cat 
/var/run/storm/logviewer.pid` >/dev/null 2>&1)',
+      ignore_failures = True,
+    )
+    self.assertResourceCalled('Execute', 'rm -f /var/run/storm/logviewer.pid',
+    )
+    
+    self.assertNoMoreResources()
+
+  def test_configure_default(self):
+    self.executeScript("2.1/services/STORM/package/scripts/supervisor_prod.py",
+                       classname = "Supervisor",
+                       command = "configure",
+                       config_file="secured.json"
+    )
+    self.assert_configure_secured()
+    self.assertNoMoreResources()
+
+  def test_start_secured(self):
+    self.executeScript("2.1/services/STORM/package/scripts/supervisor_prod.py",
+                       classname = "Supervisor",
+                       command = "start",
+                       config_file="secured.json"
+    )
+
+    self.assert_configure_secured()
+    
+    self.assertResourceCalled('Execute', 'supervisorctl start 
storm-supervisor',
+                        wait_for_finish = False,
+    )
+    self.assertResourceCalled('Execute', 'env JAVA_HOME=/usr/jdk64/jdk1.7.0_45 
PATH=$PATH:/usr/jdk64/jdk1.7.0_45/bin /usr/bin/storm logviewer',
+      wait_for_finish = False,
+      not_if = 'ls /var/run/storm/logviewer.pid >/dev/null 2>&1 && ps `cat 
/var/run/storm/logviewer.pid` >/dev/null 2>&1',
+      user = 'storm',
+    )
+    self.assertResourceCalled('Execute', 'pgrep -f 
"^java.+backtype.storm.daemon.logviewer$" && pgrep -f 
"^java.+backtype.storm.daemon.logviewer$" > /var/run/storm/logviewer.pid',
+      logoutput = True,
+      tries = 6,
+      user = 'storm',
+      try_sleep = 10,
+    )
+    
+    self.assertNoMoreResources()
+
+  def test_stop_secured(self):
+    self.executeScript("2.1/services/STORM/package/scripts/supervisor_prod.py",
+                       classname = "Supervisor",
+                       command = "stop",
+                       config_file="secured.json"
+    )
+    
+    self.assertResourceCalled('Execute', 'supervisorctl stop storm-supervisor',
+                              wait_for_finish = False,
+    )
+    self.assertResourceCalled('Execute', 'kill `cat 
/var/run/storm/logviewer.pid` >/dev/null 2>&1',
+      not_if = '! (ls /var/run/storm/logviewer.pid >/dev/null 2>&1 && ps `cat 
/var/run/storm/logviewer.pid` >/dev/null 2>&1)',
+    )
+    self.assertResourceCalled('Execute', 'kill -9 `cat 
/var/run/storm/logviewer.pid` >/dev/null 2>&1',
+      not_if = 'sleep 2; ! (ls /var/run/storm/logviewer.pid >/dev/null 2>&1 && 
ps `cat /var/run/storm/logviewer.pid` >/dev/null 2>&1) || sleep 20; ! (ls 
/var/run/storm/logviewer.pid >/dev/null 2>&1 && ps `cat 
/var/run/storm/logviewer.pid` >/dev/null 2>&1)',
+      ignore_failures = True,
+    )
+    self.assertResourceCalled('Execute', 'rm -f /var/run/storm/logviewer.pid',
+    )
+    
+    self.assertNoMoreResources()
+
+  def assert_configure_default(self):
+    self.assertResourceCalled('Directory', '/var/log/storm',
+      owner = 'storm',
+      group = 'hadoop',
+      recursive = True,
+    )    
+    self.assertResourceCalled('Directory', '/var/run/storm',
+      owner = 'storm',
+      group = 'hadoop',
+      recursive = True,
+    )    
+    self.assertResourceCalled('Directory', '/hadoop/storm',
+      owner = 'storm',
+      group = 'hadoop',
+      recursive = True,
+    )    
+    self.assertResourceCalled('Directory', '/etc/storm/conf',
+      owner = 'storm',
+      group = 'hadoop',
+      recursive = True,
+    )    
+    self.assertResourceCalled('File', '/etc/storm/conf/config.yaml',
+      owner = 'storm',
+      content = Template('config.yaml.j2'),
+      group = 'hadoop',
+    )    
+    self.assertResourceCalled('File', '/etc/storm/conf/storm.yaml',
+      owner = 'storm',
+      content = 'InlineTemplateMock',
+      group = 'hadoop',
+      mode = None,
+    )
+
+  def assert_configure_secured(self):
+    self.assertResourceCalled('Directory', '/var/log/storm',
+      owner = 'storm',
+      group = 'hadoop',
+      recursive = True,
+    )    
+    self.assertResourceCalled('Directory', '/var/run/storm',
+      owner = 'storm',
+      group = 'hadoop',
+      recursive = True,
+    )    
+    self.assertResourceCalled('Directory', '/hadoop/storm',
+      owner = 'storm',
+      group = 'hadoop',
+      recursive = True,
+    )    
+    self.assertResourceCalled('Directory', '/etc/storm/conf',
+      owner = 'storm',
+      group = 'hadoop',
+      recursive = True,
+    )    
+    self.assertResourceCalled('File', '/etc/storm/conf/config.yaml',
+      owner = 'storm',
+      content = Template('config.yaml.j2'),
+      group = 'hadoop',
+    )    
+    self.assertResourceCalled('File', '/etc/storm/conf/storm.yaml',
+      owner = 'storm',
+      content = 'InlineTemplateMock',
+      group = 'hadoop',
+      mode = None,
+    )    
+    self.assertResourceCalled('TemplateConfig', 
'/etc/storm/conf/storm_jaas.conf',
+      owner = 'storm',
+    )

Reply via email to