Repository: ambari Updated Branches: refs/heads/trunk 3dd56eb44 -> b56019031
AMBARI-5280. Storm daemons need to run under supervision (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b5601903 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b5601903 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b5601903 Branch: refs/heads/trunk Commit: b56019031443e174f0e09ec753259e48b48457b6 Parents: 3dd56eb Author: Andrew Onischuk <aonis...@hortonworks.com> Authored: Mon Mar 31 08:23:30 2014 -0700 Committer: Andrew Onischuk <aonis...@hortonworks.com> Committed: Mon Mar 31 08:23:30 2014 -0700 ---------------------------------------------------------------------- .../services/STORM/package/scripts/nimbus.py | 1 - .../STORM/package/scripts/nimbus_prod.py | 55 +++++ .../STORM/package/scripts/supervisor_prod.py | 57 +++++ .../package/scripts/supervisord_service.py | 32 +++ .../stacks/2.1/STORM/test_storm_nimbus_prod.py | 163 +++++++++++++++ .../2.1/STORM/test_storm_supervisor_prod.py | 208 +++++++++++++++++++ 6 files changed, 515 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/b5601903/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/nimbus.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/nimbus.py b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/nimbus.py index 4ddc28f..2dd7963 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/nimbus.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/nimbus.py @@ -22,7 +22,6 @@ import sys from resource_management import * from storm import storm from service import service -from service_check import ServiceCheck class Nimbus(Script): http://git-wip-us.apache.org/repos/asf/ambari/blob/b5601903/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/nimbus_prod.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/nimbus_prod.py b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/nimbus_prod.py new file mode 100644 index 0000000..a1f8afe --- /dev/null +++ b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/nimbus_prod.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import sys +from resource_management import * +from storm import storm +from supervisord_service import supervisord_service, supervisord_check_status + + +class Nimbus(Script): + def install(self, env): + self.install_packages(env) + self.configure(env) + + def configure(self, env): + import params + env.set_params(params) + + storm() + + def start(self, env): + import params + env.set_params(params) + self.configure(env) + + supervisord_service("nimbus", action="start") + + def stop(self, env): + import params + env.set_params(params) + + supervisord_service("nimbus", action="stop") + + def status(self, env): + supervisord_check_status("nimbus") + +if __name__ == "__main__": + Nimbus().execute() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/b5601903/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/supervisor_prod.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/supervisor_prod.py b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/supervisor_prod.py new file mode 100644 index 0000000..7aecdc5 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/supervisor_prod.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import sys +from resource_management import * +from storm import storm +from service import service +from supervisord_service import supervisord_service, supervisord_check_status + + +class Supervisor(Script): + def install(self, env): + self.install_packages(env) + self.configure(env) + + def configure(self, env): + import params + env.set_params(params) + storm() + + def start(self, env): + import params + env.set_params(params) + self.configure(env) + + supervisord_service("supervisor", action="start") + service("logviewer", action="start") + + def stop(self, env): + import params + env.set_params(params) + + supervisord_service("supervisor", action="stop") + service("logviewer", action="stop") + + def status(self, env): + supervisord_check_status("supervisor") + +if __name__ == "__main__": + Supervisor().execute() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/b5601903/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/supervisord_service.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/supervisord_service.py b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/supervisord_service.py new file mode 100644 index 0000000..6a5ea0b --- /dev/null +++ b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/supervisord_service.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management import * + +def supervisord_service(component_name, action): + Execute(format("supervisorctl {action} storm-{component_name}"), + wait_for_finish=False + ) + +def supervisord_check_status(component_name): + try: + Execute(format("supervisorctl status storm-{component_name} | grep RUNNING")) + except Fail: + raise ComponentIsNotRunning() http://git-wip-us.apache.org/repos/asf/ambari/blob/b5601903/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py new file mode 100644 index 0000000..b236083 --- /dev/null +++ b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py @@ -0,0 +1,163 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +from mock.mock import MagicMock, call, patch +from stacks.utils.RMFTestCase import * +import resource_management.core.source + +@patch.object(resource_management.core.source, "InlineTemplate", new = MagicMock(return_value='InlineTemplateMock')) +class TestStormNimbus(RMFTestCase): + + def test_configure_default(self): + self.executeScript("2.1/services/STORM/package/scripts/nimbus_prod.py", + classname = "Nimbus", + command = "configure", + config_file="default.json" + ) + self.assert_configure_default() + self.assertNoMoreResources() + + def test_start_default(self): + self.executeScript("2.1/services/STORM/package/scripts/nimbus_prod.py", + classname = "Nimbus", + command = "start", + config_file="default.json" + ) + + self.assert_configure_default() + self.assertResourceCalled('Execute', 'supervisorctl start storm-nimbus', + wait_for_finish = False, + ) + + self.assertNoMoreResources() + + def test_stop_default(self): + self.executeScript("2.1/services/STORM/package/scripts/nimbus_prod.py", + classname = "Nimbus", + command = "stop", + config_file="default.json" + ) + self.assertResourceCalled('Execute', 'supervisorctl stop storm-nimbus', + wait_for_finish = False, + ) + self.assertNoMoreResources() + + def test_configure_default(self): + self.executeScript("2.1/services/STORM/package/scripts/nimbus_prod.py", + classname = "Nimbus", + command = "configure", + config_file="secured.json" + ) + self.assert_configure_secured() + self.assertNoMoreResources() + + def test_start_secured(self): + self.executeScript("2.1/services/STORM/package/scripts/nimbus_prod.py", + classname = "Nimbus", + command = "start", + config_file="secured.json" + ) + + self.assert_configure_secured() + self.assertResourceCalled('Execute', 'supervisorctl start storm-nimbus', + wait_for_finish = False, + ) + + self.assertNoMoreResources() + + def test_stop_secured(self): + self.executeScript("2.1/services/STORM/package/scripts/nimbus_prod.py", + classname = "Nimbus", + command = "stop", + config_file="secured.json" + ) + self.assertResourceCalled('Execute', 'supervisorctl stop storm-nimbus', + wait_for_finish = False, + ) + self.assertNoMoreResources() + + def assert_configure_default(self): + self.assertResourceCalled('Directory', '/var/log/storm', + owner = 'storm', + group = 'hadoop', + recursive = True, + ) + self.assertResourceCalled('Directory', '/var/run/storm', + owner = 'storm', + group = 'hadoop', + recursive = True, + ) + self.assertResourceCalled('Directory', '/hadoop/storm', + owner = 'storm', + group = 'hadoop', + recursive = True, + ) + self.assertResourceCalled('Directory', '/etc/storm/conf', + owner = 'storm', + group = 'hadoop', + recursive = True, + ) + self.assertResourceCalled('File', '/etc/storm/conf/config.yaml', + owner = 'storm', + content = Template('config.yaml.j2'), + group = 'hadoop', + ) + self.assertResourceCalled('File', '/etc/storm/conf/storm.yaml', + owner = 'storm', + content = 'InlineTemplateMock', + group = 'hadoop', + mode = None, + ) + + def assert_configure_secured(self): + self.assertResourceCalled('Directory', '/var/log/storm', + owner = 'storm', + group = 'hadoop', + recursive = True, + ) + self.assertResourceCalled('Directory', '/var/run/storm', + owner = 'storm', + group = 'hadoop', + recursive = True, + ) + self.assertResourceCalled('Directory', '/hadoop/storm', + owner = 'storm', + group = 'hadoop', + recursive = True, + ) + self.assertResourceCalled('Directory', '/etc/storm/conf', + owner = 'storm', + group = 'hadoop', + recursive = True, + ) + self.assertResourceCalled('File', '/etc/storm/conf/config.yaml', + owner = 'storm', + content = Template('config.yaml.j2'), + group = 'hadoop', + ) + self.assertResourceCalled('File', '/etc/storm/conf/storm.yaml', + owner = 'storm', + content = 'InlineTemplateMock', + group = 'hadoop', + mode = None, + ) + self.assertResourceCalled('TemplateConfig', '/etc/storm/conf/storm_jaas.conf', + owner = 'storm', + ) http://git-wip-us.apache.org/repos/asf/ambari/blob/b5601903/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py new file mode 100644 index 0000000..91398e7 --- /dev/null +++ b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +from mock.mock import MagicMock, call, patch +from stacks.utils.RMFTestCase import * +import resource_management.core.source + +@patch.object(resource_management.core.source, "InlineTemplate", new = MagicMock(return_value='InlineTemplateMock')) +class TestStormSupervisor(RMFTestCase): + + def test_configure_default(self): + self.executeScript("2.1/services/STORM/package/scripts/supervisor_prod.py", + classname = "Supervisor", + command = "configure", + config_file="default.json" + ) + self.assert_configure_default() + self.assertNoMoreResources() + + def test_start_default(self): + self.executeScript("2.1/services/STORM/package/scripts/supervisor_prod.py", + classname = "Supervisor", + command = "start", + config_file="default.json" + ) + + self.assert_configure_default() + + self.assertResourceCalled('Execute', 'supervisorctl start storm-supervisor', + wait_for_finish = False, + ) + self.assertResourceCalled('Execute', 'env JAVA_HOME=/usr/jdk64/jdk1.7.0_45 PATH=$PATH:/usr/jdk64/jdk1.7.0_45/bin /usr/bin/storm logviewer', + wait_for_finish = False, + not_if = 'ls /var/run/storm/logviewer.pid >/dev/null 2>&1 && ps `cat /var/run/storm/logviewer.pid` >/dev/null 2>&1', + user = 'storm', + ) + self.assertResourceCalled('Execute', 'pgrep -f "^java.+backtype.storm.daemon.logviewer$" && pgrep -f "^java.+backtype.storm.daemon.logviewer$" > /var/run/storm/logviewer.pid', + logoutput = True, + tries = 6, + user = 'storm', + try_sleep = 10, + ) + + self.assertNoMoreResources() + + def test_stop_default(self): + self.executeScript("2.1/services/STORM/package/scripts/supervisor_prod.py", + classname = "Supervisor", + command = "stop", + config_file="default.json" + ) + self.assertResourceCalled('Execute', 'supervisorctl stop storm-supervisor', + wait_for_finish = False, + ) + self.assertResourceCalled('Execute', 'kill `cat /var/run/storm/logviewer.pid` >/dev/null 2>&1', + not_if = '! (ls /var/run/storm/logviewer.pid >/dev/null 2>&1 && ps `cat /var/run/storm/logviewer.pid` >/dev/null 2>&1)', + ) + self.assertResourceCalled('Execute', 'kill -9 `cat /var/run/storm/logviewer.pid` >/dev/null 2>&1', + not_if = 'sleep 2; ! (ls /var/run/storm/logviewer.pid >/dev/null 2>&1 && ps `cat /var/run/storm/logviewer.pid` >/dev/null 2>&1) || sleep 20; ! (ls /var/run/storm/logviewer.pid >/dev/null 2>&1 && ps `cat /var/run/storm/logviewer.pid` >/dev/null 2>&1)', + ignore_failures = True, + ) + self.assertResourceCalled('Execute', 'rm -f /var/run/storm/logviewer.pid', + ) + + self.assertNoMoreResources() + + def test_configure_default(self): + self.executeScript("2.1/services/STORM/package/scripts/supervisor_prod.py", + classname = "Supervisor", + command = "configure", + config_file="secured.json" + ) + self.assert_configure_secured() + self.assertNoMoreResources() + + def test_start_secured(self): + self.executeScript("2.1/services/STORM/package/scripts/supervisor_prod.py", + classname = "Supervisor", + command = "start", + config_file="secured.json" + ) + + self.assert_configure_secured() + + self.assertResourceCalled('Execute', 'supervisorctl start storm-supervisor', + wait_for_finish = False, + ) + self.assertResourceCalled('Execute', 'env JAVA_HOME=/usr/jdk64/jdk1.7.0_45 PATH=$PATH:/usr/jdk64/jdk1.7.0_45/bin /usr/bin/storm logviewer', + wait_for_finish = False, + not_if = 'ls /var/run/storm/logviewer.pid >/dev/null 2>&1 && ps `cat /var/run/storm/logviewer.pid` >/dev/null 2>&1', + user = 'storm', + ) + self.assertResourceCalled('Execute', 'pgrep -f "^java.+backtype.storm.daemon.logviewer$" && pgrep -f "^java.+backtype.storm.daemon.logviewer$" > /var/run/storm/logviewer.pid', + logoutput = True, + tries = 6, + user = 'storm', + try_sleep = 10, + ) + + self.assertNoMoreResources() + + def test_stop_secured(self): + self.executeScript("2.1/services/STORM/package/scripts/supervisor_prod.py", + classname = "Supervisor", + command = "stop", + config_file="secured.json" + ) + + self.assertResourceCalled('Execute', 'supervisorctl stop storm-supervisor', + wait_for_finish = False, + ) + self.assertResourceCalled('Execute', 'kill `cat /var/run/storm/logviewer.pid` >/dev/null 2>&1', + not_if = '! (ls /var/run/storm/logviewer.pid >/dev/null 2>&1 && ps `cat /var/run/storm/logviewer.pid` >/dev/null 2>&1)', + ) + self.assertResourceCalled('Execute', 'kill -9 `cat /var/run/storm/logviewer.pid` >/dev/null 2>&1', + not_if = 'sleep 2; ! (ls /var/run/storm/logviewer.pid >/dev/null 2>&1 && ps `cat /var/run/storm/logviewer.pid` >/dev/null 2>&1) || sleep 20; ! (ls /var/run/storm/logviewer.pid >/dev/null 2>&1 && ps `cat /var/run/storm/logviewer.pid` >/dev/null 2>&1)', + ignore_failures = True, + ) + self.assertResourceCalled('Execute', 'rm -f /var/run/storm/logviewer.pid', + ) + + self.assertNoMoreResources() + + def assert_configure_default(self): + self.assertResourceCalled('Directory', '/var/log/storm', + owner = 'storm', + group = 'hadoop', + recursive = True, + ) + self.assertResourceCalled('Directory', '/var/run/storm', + owner = 'storm', + group = 'hadoop', + recursive = True, + ) + self.assertResourceCalled('Directory', '/hadoop/storm', + owner = 'storm', + group = 'hadoop', + recursive = True, + ) + self.assertResourceCalled('Directory', '/etc/storm/conf', + owner = 'storm', + group = 'hadoop', + recursive = True, + ) + self.assertResourceCalled('File', '/etc/storm/conf/config.yaml', + owner = 'storm', + content = Template('config.yaml.j2'), + group = 'hadoop', + ) + self.assertResourceCalled('File', '/etc/storm/conf/storm.yaml', + owner = 'storm', + content = 'InlineTemplateMock', + group = 'hadoop', + mode = None, + ) + + def assert_configure_secured(self): + self.assertResourceCalled('Directory', '/var/log/storm', + owner = 'storm', + group = 'hadoop', + recursive = True, + ) + self.assertResourceCalled('Directory', '/var/run/storm', + owner = 'storm', + group = 'hadoop', + recursive = True, + ) + self.assertResourceCalled('Directory', '/hadoop/storm', + owner = 'storm', + group = 'hadoop', + recursive = True, + ) + self.assertResourceCalled('Directory', '/etc/storm/conf', + owner = 'storm', + group = 'hadoop', + recursive = True, + ) + self.assertResourceCalled('File', '/etc/storm/conf/config.yaml', + owner = 'storm', + content = Template('config.yaml.j2'), + group = 'hadoop', + ) + self.assertResourceCalled('File', '/etc/storm/conf/storm.yaml', + owner = 'storm', + content = 'InlineTemplateMock', + group = 'hadoop', + mode = None, + ) + self.assertResourceCalled('TemplateConfig', '/etc/storm/conf/storm_jaas.conf', + owner = 'storm', + )