Repository: storm Updated Branches: refs/heads/master 5eff2e761 -> 07098e8eb
STORM-675. Allow users to have storm-env.sh under config dir to set custom JAVA_HOME and other env variables. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a4a8f7d2 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a4a8f7d2 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a4a8f7d2 Branch: refs/heads/master Commit: a4a8f7d22bf79a04d4cddd067f2a63766084ebfe Parents: 5ca7e80 Author: Sriharsha Chintalapani <m...@harsha.io> Authored: Tue Feb 17 17:11:50 2015 -0800 Committer: Sriharsha Chintalapani <m...@harsha.io> Committed: Tue Feb 17 17:11:50 2015 -0800 ---------------------------------------------------------------------- bin/storm | 586 ++++++------------------------------------------- bin/storm.py | 543 +++++++++++++++++++++++++++++++++++++++++++++ conf/storm-env.sh | 26 +++ 3 files changed, 632 insertions(+), 523 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/a4a8f7d2/bin/storm ---------------------------------------------------------------------- diff --git a/bin/storm b/bin/storm index 410a1b0..fffd839 100755 --- a/bin/storm +++ b/bin/storm @@ -1,5 +1,7 @@ -#!/usr/bin/python - +#!/bin/bash +# +# Copyright 2014 The Apache Software Foundation +# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -8,531 +10,69 @@ # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# -import os -import sys -import random -import subprocess as sub -import re -import shlex -try: - # python 3 - from urllib.parse import quote_plus -except ImportError: - # python 2 - from urllib import quote_plus -try: - # python 3 - import configparser -except ImportError: - # python 2 - import ConfigParser as configparser - -def identity(x): - return x - -def cygpath(x): - command = ["cygpath", "-wp", x] - p = sub.Popen(command,stdout=sub.PIPE) - output, errors = p.communicate() - lines = output.split(os.linesep) - return lines[0] - -def init_storm_env(): - global CLUSTER_CONF_DIR - ini_file = os.path.join(CLUSTER_CONF_DIR, 'storm_env.ini') - if not os.path.isfile(ini_file): - return - config = configparser.ConfigParser() - config.optionxform = str - config.read(ini_file) - options = config.options('environment') - for option in options: - value = config.get('environment', option) - os.environ[option] = value - -normclasspath = cygpath if sys.platform == 'cygwin' else identity -STORM_DIR = os.sep.join(os.path.realpath( __file__ ).split(os.sep)[:-2]) -USER_CONF_DIR = os.path.expanduser("~" + os.sep + ".storm") -STORM_CONF_DIR = os.getenv('STORM_CONF_DIR', None) - -if STORM_CONF_DIR == None: - CLUSTER_CONF_DIR = os.path.join(STORM_DIR, "conf") -else: - CLUSTER_CONF_DIR = STORM_CONF_DIR - -if (not os.path.isfile(os.path.join(USER_CONF_DIR, "storm.yaml"))): - USER_CONF_DIR = CLUSTER_CONF_DIR - -STORM_LIB_DIR = os.path.join(STORM_DIR, "lib") -STORM_BIN_DIR = os.path.join(STORM_DIR, "bin") -STORM_LOGBACK_CONF_DIR = os.path.join(STORM_DIR, "logback") - -init_storm_env() - -CONFIG_OPTS = [] -CONFFILE = "" -JAR_JVM_OPTS = shlex.split(os.getenv('STORM_JAR_JVM_OPTS', '')) -JAVA_HOME = os.getenv('JAVA_HOME', None) -JAVA_CMD = 'java' if not JAVA_HOME else os.path.join(JAVA_HOME, 'bin', 'java') - -def get_config_opts(): - global CONFIG_OPTS - return "-Dstorm.options=" + ','.join(map(quote_plus,CONFIG_OPTS)) - -if not os.path.exists(STORM_LIB_DIR): - print("******************************************") - print("The storm client can only be run from within a release. You appear to be trying to run the client from a checkout of Storm's source code.") - print("\nYou can download a Storm release at http://storm-project.net/downloads.html") - print("******************************************") - sys.exit(1) - -def get_jars_full(adir): - files = os.listdir(adir) - ret = [] - for f in files: - if f.endswith(".jar"): - ret.append(os.path.join(adir, f)) - return ret - -def get_classpath(extrajars): - ret = get_jars_full(STORM_DIR) - ret.extend(get_jars_full(STORM_LIB_DIR)) - ret.extend(extrajars) - return normclasspath(os.pathsep.join(ret)) - -def confvalue(name, extrapaths): - global CONFFILE - command = [ - JAVA_CMD, "-client", get_config_opts(), "-Dstorm.conf.file=" + CONFFILE, - "-cp", get_classpath(extrapaths), "backtype.storm.command.config_value", name - ] - p = sub.Popen(command, stdout=sub.PIPE) - output, errors = p.communicate() - # python 3 - if not isinstance(output, str): - output = output.decode('utf-8') - lines = output.split(os.linesep) - for line in lines: - tokens = line.split(" ") - if tokens[0] == "VALUE:": - return " ".join(tokens[1:]) - return "" - -def print_localconfvalue(name): - """Syntax: [storm localconfvalue conf-name] - - Prints out the value for conf-name in the local Storm configs. - The local Storm configs are the ones in ~/.storm/storm.yaml merged - in with the configs in defaults.yaml. - """ - print(name + ": " + confvalue(name, [USER_CONF_DIR])) - -def print_remoteconfvalue(name): - """Syntax: [storm remoteconfvalue conf-name] - - Prints out the value for conf-name in the cluster's Storm configs. - The cluster's Storm configs are the ones in $STORM-PATH/conf/storm.yaml - merged in with the configs in defaults.yaml. - - This command must be run on a cluster machine. - """ - print(name + ": " + confvalue(name, [CLUSTER_CONF_DIR])) - -def parse_args(string): - r"""Takes a string of whitespace-separated tokens and parses it into a list. - Whitespace inside tokens may be quoted with single quotes, double quotes or - backslash (similar to command-line arguments in bash). - - >>> parse_args(r'''"a a" 'b b' c\ c "d'd" 'e"e' 'f\'f' "g\"g" "i""i" 'j''j' k" "k l' l' mm n\\n''') - ['a a', 'b b', 'c c', "d'd", 'e"e', "f'f", 'g"g', 'ii', 'jj', 'k k', 'l l', 'mm', r'n\n'] - """ - re_split = re.compile(r'''((?: - [^\s"'\\] | - "(?: [^"\\] | \\.)*" | - '(?: [^'\\] | \\.)*' | - \\. - )+)''', re.VERBOSE) - args = re_split.split(string)[1::2] - args = [re.compile(r'"((?:[^"\\]|\\.)*)"').sub('\\1', x) for x in args] - args = [re.compile(r"'((?:[^'\\]|\\.)*)'").sub('\\1', x) for x in args] - return [re.compile(r'\\(.)').sub('\\1', x) for x in args] - -def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False): - global CONFFILE - storm_log_dir = confvalue("storm.log.dir",[CLUSTER_CONF_DIR]) - if(storm_log_dir == None or storm_log_dir == "nil"): - storm_log_dir = os.path.join(STORM_DIR, "logs") - all_args = [ - JAVA_CMD, jvmtype, get_config_opts(), - "-Dstorm.home=" + STORM_DIR, - "-Dstorm.log.dir=" + storm_log_dir, - "-Djava.library.path=" + confvalue("java.library.path", extrajars), - "-Dstorm.conf.file=" + CONFFILE, - "-cp", get_classpath(extrajars), - ] + jvmopts + [klass] + list(args) - print("Running: " + " ".join(all_args)) - if fork: - os.spawnvp(os.P_WAIT, JAVA_CMD, all_args) - else: - # handling whitespaces in JAVA_CMD - sub.call(all_args) - -def jar(jarfile, klass, *args): - """Syntax: [storm jar topology-jar-path class ...] - - Runs the main method of class with the specified arguments. - The storm jars and configs in ~/.storm are put on the classpath. - The process is configured so that StormSubmitter - (http://storm.incubator.apache.org/apidocs/backtype/storm/StormSubmitter.html) - will upload the jar at topology-jar-path when the topology is submitted. - """ - exec_storm_class( - klass, - jvmtype="-client", - extrajars=[jarfile, USER_CONF_DIR, STORM_BIN_DIR], - args=args, - jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + jarfile]) - -def kill(*args): - """Syntax: [storm kill topology-name [-w wait-time-secs]] - - Kills the topology with the name topology-name. Storm will - first deactivate the topology's spouts for the duration of - the topology's message timeout to allow all messages currently - being processed to finish processing. Storm will then shutdown - the workers and clean up their state. You can override the length - of time Storm waits between deactivation and shutdown with the -w flag. - """ - exec_storm_class( - "backtype.storm.command.kill_topology", - args=args, - jvmtype="-client", - extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) - - -def upload_credentials(*args): - """Syntax: [storm upload_credentials topology-name [credkey credvalue]*] - - Uploads a new set of credentials to a running topology - """ - exec_storm_class( - "backtype.storm.command.upload_credentials", - args=args, - jvmtype="-client", - extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) - -def activate(*args): - """Syntax: [storm activate topology-name] - - Activates the specified topology's spouts. - """ - exec_storm_class( - "backtype.storm.command.activate", - args=args, - jvmtype="-client", - extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) - -def listtopos(*args): - """Syntax: [storm list] - - List the running topologies and their statuses. - """ - exec_storm_class( - "backtype.storm.command.list", - args=args, - jvmtype="-client", - extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) - -def deactivate(*args): - """Syntax: [storm deactivate topology-name] - - Deactivates the specified topology's spouts. - """ - exec_storm_class( - "backtype.storm.command.deactivate", - args=args, - jvmtype="-client", - extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) - -def rebalance(*args): - """Syntax: [storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*] - - Sometimes you may wish to spread out where the workers for a topology - are running. For example, let's say you have a 10 node cluster running - 4 workers per node, and then let's say you add another 10 nodes to - the cluster. You may wish to have Storm spread out the workers for the - running topology so that each node runs 2 workers. One way to do this - is to kill the topology and resubmit it, but Storm provides a "rebalance" - command that provides an easier way to do this. - - Rebalance will first deactivate the topology for the duration of the - message timeout (overridable with the -w flag) and then redistribute - the workers evenly around the cluster. The topology will then return to - its previous state of activation (so a deactivated topology will still - be deactivated and an activated topology will go back to being activated). - - The rebalance command can also be used to change the parallelism of a running topology. - Use the -n and -e switches to change the number of workers or number of executors of a component - respectively. - """ - exec_storm_class( - "backtype.storm.command.rebalance", - args=args, - jvmtype="-client", - extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) - -def shell(resourcesdir, command, *args): - tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar" - os.system("jar cf %s %s" % (tmpjarpath, resourcesdir)) - runnerargs = [tmpjarpath, command] - runnerargs.extend(args) - exec_storm_class( - "backtype.storm.command.shell_submission", - args=runnerargs, - jvmtype="-client", - extrajars=[USER_CONF_DIR], - fork=True) - os.system("rm " + tmpjarpath) - -def repl(): - """Syntax: [storm repl] - - Opens up a Clojure REPL with the storm jars and configuration - on the classpath. Useful for debugging. - """ - cppaths = [CLUSTER_CONF_DIR] - exec_storm_class("clojure.main", jvmtype="-client", extrajars=cppaths) - -def get_logback_conf_dir(): - cppaths = [CLUSTER_CONF_DIR] - storm_logback_conf_dir = confvalue("storm.logback.conf.dir", cppaths) - if(storm_logback_conf_dir == None or storm_logback_conf_dir == "nil"): - storm_logback_conf_dir = STORM_LOGBACK_CONF_DIR - return storm_logback_conf_dir - -def nimbus(klass="backtype.storm.daemon.nimbus"): - """Syntax: [storm nimbus] - - Launches the nimbus daemon. This command should be run under - supervision with a tool like daemontools or monit. - - See Setting up a Storm cluster for more information. - (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster) - """ - cppaths = [CLUSTER_CONF_DIR] - jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [ - "-Dlogfile.name=nimbus.log", - "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), "cluster.xml"), - ] - exec_storm_class( - klass, - jvmtype="-server", - extrajars=cppaths, - jvmopts=jvmopts) - -def supervisor(klass="backtype.storm.daemon.supervisor"): - """Syntax: [storm supervisor] - - Launches the supervisor daemon. This command should be run - under supervision with a tool like daemontools or monit. - - See Setting up a Storm cluster for more information. - (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster) - """ - cppaths = [CLUSTER_CONF_DIR] - jvmopts = parse_args(confvalue("supervisor.childopts", cppaths)) + [ - "-Dlogfile.name=supervisor.log", - "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), "cluster.xml"), - ] - exec_storm_class( - klass, - jvmtype="-server", - extrajars=cppaths, - jvmopts=jvmopts) - -def ui(): - """Syntax: [storm ui] - - Launches the UI daemon. The UI provides a web interface for a Storm - cluster and shows detailed stats about running topologies. This command - should be run under supervision with a tool like daemontools or monit. - - See Setting up a Storm cluster for more information. - (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster) - """ - cppaths = [CLUSTER_CONF_DIR] - jvmopts = parse_args(confvalue("ui.childopts", cppaths)) + [ - "-Dlogfile.name=ui.log", - "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), "cluster.xml") - ] - exec_storm_class( - "backtype.storm.ui.core", - jvmtype="-server", - jvmopts=jvmopts, - extrajars=[STORM_DIR, CLUSTER_CONF_DIR]) - -def logviewer(): - """Syntax: [storm logviewer] - - Launches the log viewer daemon. It provides a web interface for viewing - storm log files. This command should be run under supervision with a - tool like daemontools or monit. - - See Setting up a Storm cluster for more information. - (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster) - """ - cppaths = [CLUSTER_CONF_DIR] - jvmopts = parse_args(confvalue("logviewer.childopts", cppaths)) + [ - "-Dlogfile.name=logviewer.log", - "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), "cluster.xml") - ] - exec_storm_class( - "backtype.storm.daemon.logviewer", - jvmtype="-server", - jvmopts=jvmopts, - extrajars=[STORM_DIR, CLUSTER_CONF_DIR]) - -def drpc(): - """Syntax: [storm drpc] - - Launches a DRPC daemon. This command should be run under supervision - with a tool like daemontools or monit. - - See Distributed RPC for more information. - (http://storm.incubator.apache.org/documentation/Distributed-RPC) - """ - cppaths = [CLUSTER_CONF_DIR] - jvmopts = parse_args(confvalue("drpc.childopts", cppaths)) + [ - "-Dlogfile.name=drpc.log", - "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), "cluster.xml") - ] - exec_storm_class( - "backtype.storm.daemon.drpc", - jvmtype="-server", - jvmopts=jvmopts, - extrajars=[CLUSTER_CONF_DIR]) - -def dev_zookeeper(): - """Syntax: [storm dev-zookeeper] - - Launches a fresh Zookeeper server using "dev.zookeeper.path" as its local dir and - "storm.zookeeper.port" as its port. This is only intended for development/testing, the - Zookeeper instance launched is not configured to be used in production. - """ - cppaths = [CLUSTER_CONF_DIR] - exec_storm_class( - "backtype.storm.command.dev_zookeeper", - jvmtype="-server", - extrajars=[CLUSTER_CONF_DIR]) - -def version(): - """Syntax: [storm version] - - Prints the version number of this Storm release. - """ - cppaths = [CLUSTER_CONF_DIR] - exec_storm_class( - "backtype.storm.utils.VersionInfo", - jvmtype="-client", - extrajars=[CLUSTER_CONF_DIR]) - -def print_classpath(): - """Syntax: [storm classpath] - - Prints the classpath used by the storm client when running commands. - """ - print(get_classpath([])) - -def monitor(*args): - """Syntax: [storm monitor topology-name [-i interval-secs] [-m component-id] [-s stream-id] [-w [emitted | transferred]]] - - Monitor given topology's throughput interactively. - One can specify poll-interval, component-id, stream-id, watch-item[emitted | transferred] - By default, - poll-interval is 4 seconds; - all component-ids will be list; - stream-id is 'default'; - watch-item is 'emitted'; - """ - exec_storm_class( - "backtype.storm.command.monitor", - args=args, - jvmtype="-client", - extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) - - -def print_commands(): - """Print all client commands and link to documentation""" - print("Commands:\n\t" + "\n\t".join(sorted(COMMANDS.keys()))) - print("\nHelp: \n\thelp \n\thelp <command>") - print("\nDocumentation for the storm client can be found at http://storm.incubator.apache.org/documentation/Command-line-client.html\n") - print("Configs can be overridden using one or more -c flags, e.g. \"storm list -c nimbus.host=nimbus.mycompany.com\"\n") - -def print_usage(command=None): - """Print one help message or list of available commands""" - if command != None: - if command in COMMANDS: - print(COMMANDS[command].__doc__ or - "No documentation provided for <%s>" % command) - else: - print("<%s> is not a valid command" % command) - else: - print_commands() - -def unknown_command(*args): - print("Unknown command: [storm %s]" % ' '.join(sys.argv[1:])) - print_usage() - sys.exit(254) - -COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui, "logviewer": logviewer, - "drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue, - "remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath, - "activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage, - "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, "monitor": monitor, - "upload-credentials": upload_credentials} - -def parse_config(config_list): - global CONFIG_OPTS - if len(config_list) > 0: - for config in config_list: - CONFIG_OPTS.append(config) - -def parse_config_opts(args): - curr = args[:] - curr.reverse() - config_list = [] - args_list = [] - - while len(curr) > 0: - token = curr.pop() - if token == "-c": - config_list.append(curr.pop()) - elif token == "--config": - global CONFFILE - CONFFILE = curr.pop() - else: - args_list.append(token) - - return config_list, args_list - -def main(): - if len(sys.argv) <= 1: - print_usage() - sys.exit(-1) - global CONFIG_OPTS - config_list, args = parse_config_opts(sys.argv[1:]) - parse_config(config_list) - COMMAND = args[0] - ARGS = args[1:] - (COMMANDS.get(COMMAND, unknown_command))(*ARGS) - -if __name__ == "__main__": - main() +# Resolve links - $0 may be a softlink +PRG="${0}" + +while [ -h "${PRG}" ]; do + ls=`ls -ld "${PRG}"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "${PRG}"`/"$link" + fi +done + +# find python >= 2.6 +if [ -a /usr/bin/python2.6 ]; then + PYTHON=/usr/bin/python2.6 +fi + +if [ -z "$PYTHON" ]; then + PYTHON=/usr/bin/python +fi + +# check for version +majversion=`$PYTHON -V 2>&1 | awk '{print $2}' | cut -d'.' -f1` +minversion=`$PYTHON -V 2>&1 | awk '{print $2}' | cut -d'.' -f2` +numversion=$(( 10 * $majversion + $minversion)) +if (( $numversion < 26 )); then + echo "Need python version > 2.6" + exit 1 +fi + +STORM_BIN_DIR=`dirname ${PRG}` +export STORM_BASE_DIR=`cd ${STORM_BIN_DIR}/..;pwd` + +#check to see if the conf dir is given as an optional argument +if [ $# -gt 1 ] +then + if [ "--config" = "$1" ] + then + conf_file=$2 + if [ ! -f "$conf_file" ]; then + echo "Error: Cannot find configuration directory: $conf_file" + exit 1 + fi + STORM_CONF_FILE=$conf_file + STORM_CONF_DIR=`dirname $conf_file` + fi +fi + +export STORM_CONF_DIR="${STORM_CONF_DIR:-$STORM_BASE_DIR/conf}" +export STORM_CONF_FILE="${STORM_CONF_FILE:-$STORM_BASE_DIR/conf/storm.yaml}" + +if [ -f "${STORM_CONF_DIR}/storm-env.sh" ]; then + . "${STORM_CONF_DIR}/storm-env.sh" +fi + +exec $PYTHON ${STORM_BIN_DIR}/storm.py $@ http://git-wip-us.apache.org/repos/asf/storm/blob/a4a8f7d2/bin/storm.py ---------------------------------------------------------------------- diff --git a/bin/storm.py b/bin/storm.py new file mode 100755 index 0000000..2cf6c32 --- /dev/null +++ b/bin/storm.py @@ -0,0 +1,543 @@ +#!/usr/bin/python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import random +import subprocess as sub +import re +import shlex +try: + # python 3 + from urllib.parse import quote_plus +except ImportError: + # python 2 + from urllib import quote_plus +try: + # python 3 + import configparser +except ImportError: + # python 2 + import ConfigParser as configparser + +def is_windows(): + return sys.platform.startswith('win') + +def identity(x): + return x + +def cygpath(x): + command = ["cygpath", "-wp", x] + p = sub.Popen(command,stdout=sub.PIPE) + output, errors = p.communicate() + lines = output.split(os.linesep) + return lines[0] + +def init_storm_env(): + global CLUSTER_CONF_DIR + ini_file = os.path.join(CLUSTER_CONF_DIR, 'storm_env.ini') + if not os.path.isfile(ini_file): + return + config = configparser.ConfigParser() + config.optionxform = str + config.read(ini_file) + options = config.options('environment') + for option in options: + value = config.get('environment', option) + os.environ[option] = value + +normclasspath = cygpath if sys.platform == 'cygwin' else identity +STORM_DIR = os.sep.join(os.path.realpath( __file__ ).split(os.sep)[:-2]) +USER_CONF_DIR = os.path.expanduser("~" + os.sep + ".storm") +STORM_CONF_DIR = os.getenv('STORM_CONF_DIR', None) + +if STORM_CONF_DIR == None: + CLUSTER_CONF_DIR = os.path.join(STORM_DIR, "conf") +else: + CLUSTER_CONF_DIR = STORM_CONF_DIR + +if (not os.path.isfile(os.path.join(USER_CONF_DIR, "storm.yaml"))): + USER_CONF_DIR = CLUSTER_CONF_DIR + +STORM_LIB_DIR = os.path.join(STORM_DIR, "lib") +STORM_BIN_DIR = os.path.join(STORM_DIR, "bin") +STORM_LOGBACK_CONF_DIR = os.path.join(STORM_DIR, "logback") + +init_storm_env() + +CONFIG_OPTS = [] +CONFFILE = "" +JAR_JVM_OPTS = shlex.split(os.getenv('STORM_JAR_JVM_OPTS', '')) +JAVA_HOME = os.getenv('JAVA_HOME', None) +JAVA_CMD = 'java' if not JAVA_HOME else os.path.join(JAVA_HOME, 'bin', 'java') + +def get_config_opts(): + global CONFIG_OPTS + return "-Dstorm.options=" + ','.join(map(quote_plus,CONFIG_OPTS)) + +if not os.path.exists(STORM_LIB_DIR): + print("******************************************") + print("The storm client can only be run from within a release. You appear to be trying to run the client from a checkout of Storm's source code.") + print("\nYou can download a Storm release at http://storm-project.net/downloads.html") + print("******************************************") + sys.exit(1) + +def get_jars_full(adir): + files = os.listdir(adir) + ret = [] + for f in files: + if f.endswith(".jar"): + ret.append(os.path.join(adir, f)) + return ret + +def get_classpath(extrajars): + ret = get_jars_full(STORM_DIR) + ret.extend(get_jars_full(STORM_LIB_DIR)) + ret.extend(extrajars) + return normclasspath(os.pathsep.join(ret)) + +def confvalue(name, extrapaths): + global CONFFILE + command = [ + JAVA_CMD, "-client", get_config_opts(), "-Dstorm.conf.file=" + CONFFILE, + "-cp", get_classpath(extrapaths), "backtype.storm.command.config_value", name + ] + p = sub.Popen(command, stdout=sub.PIPE) + output, errors = p.communicate() + # python 3 + if not isinstance(output, str): + output = output.decode('utf-8') + lines = output.split(os.linesep) + for line in lines: + tokens = line.split(" ") + if tokens[0] == "VALUE:": + return " ".join(tokens[1:]) + return "" + +def print_localconfvalue(name): + """Syntax: [storm localconfvalue conf-name] + + Prints out the value for conf-name in the local Storm configs. + The local Storm configs are the ones in ~/.storm/storm.yaml merged + in with the configs in defaults.yaml. + """ + print(name + ": " + confvalue(name, [USER_CONF_DIR])) + +def print_remoteconfvalue(name): + """Syntax: [storm remoteconfvalue conf-name] + + Prints out the value for conf-name in the cluster's Storm configs. + The cluster's Storm configs are the ones in $STORM-PATH/conf/storm.yaml + merged in with the configs in defaults.yaml. + + This command must be run on a cluster machine. + """ + print(name + ": " + confvalue(name, [CLUSTER_CONF_DIR])) + +def parse_args(string): + r"""Takes a string of whitespace-separated tokens and parses it into a list. + Whitespace inside tokens may be quoted with single quotes, double quotes or + backslash (similar to command-line arguments in bash). + + >>> parse_args(r'''"a a" 'b b' c\ c "d'd" 'e"e' 'f\'f' "g\"g" "i""i" 'j''j' k" "k l' l' mm n\\n''') + ['a a', 'b b', 'c c', "d'd", 'e"e', "f'f", 'g"g', 'ii', 'jj', 'k k', 'l l', 'mm', r'n\n'] + """ + re_split = re.compile(r'''((?: + [^\s"'\\] | + "(?: [^"\\] | \\.)*" | + '(?: [^'\\] | \\.)*' | + \\. + )+)''', re.VERBOSE) + args = re_split.split(string)[1::2] + args = [re.compile(r'"((?:[^"\\]|\\.)*)"').sub('\\1', x) for x in args] + args = [re.compile(r"'((?:[^'\\]|\\.)*)'").sub('\\1', x) for x in args] + return [re.compile(r'\\(.)').sub('\\1', x) for x in args] + +def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False): + global CONFFILE + storm_log_dir = confvalue("storm.log.dir",[CLUSTER_CONF_DIR]) + if(storm_log_dir == None or storm_log_dir == "nil"): + storm_log_dir = os.path.join(STORM_DIR, "logs") + all_args = [ + "java", jvmtype, get_config_opts(), + "-Dstorm.home=" + STORM_DIR, + "-Dstorm.log.dir=" + storm_log_dir, + "-Djava.library.path=" + confvalue("java.library.path", extrajars), + "-Dstorm.conf.file=" + CONFFILE, + "-cp", get_classpath(extrajars), + ] + jvmopts + [klass] + list(args) + print("Running: " + " ".join(all_args)) + if fork: + os.spawnvp(os.P_WAIT, JAVA_CMD, all_args) + elif is_windows(): + # handling whitespaces in JAVA_CMD + sub.call(all_args) + else: + os.execvp(JAVA_CMD, all_args) + +def jar(jarfile, klass, *args): + """Syntax: [storm jar topology-jar-path class ...] + + Runs the main method of class with the specified arguments. + The storm jars and configs in ~/.storm are put on the classpath. + The process is configured so that StormSubmitter + (http://storm.incubator.apache.org/apidocs/backtype/storm/StormSubmitter.html) + will upload the jar at topology-jar-path when the topology is submitted. + """ + exec_storm_class( + klass, + jvmtype="-client", + extrajars=[jarfile, USER_CONF_DIR, STORM_BIN_DIR], + args=args, + jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + jarfile]) + +def kill(*args): + """Syntax: [storm kill topology-name [-w wait-time-secs]] + + Kills the topology with the name topology-name. Storm will + first deactivate the topology's spouts for the duration of + the topology's message timeout to allow all messages currently + being processed to finish processing. Storm will then shutdown + the workers and clean up their state. You can override the length + of time Storm waits between deactivation and shutdown with the -w flag. + """ + exec_storm_class( + "backtype.storm.command.kill_topology", + args=args, + jvmtype="-client", + extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) + + +def upload_credentials(*args): + """Syntax: [storm upload_credentials topology-name [credkey credvalue]*] + + Uploads a new set of credentials to a running topology + """ + exec_storm_class( + "backtype.storm.command.upload_credentials", + args=args, + jvmtype="-client", + extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) + +def activate(*args): + """Syntax: [storm activate topology-name] + + Activates the specified topology's spouts. + """ + exec_storm_class( + "backtype.storm.command.activate", + args=args, + jvmtype="-client", + extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) + +def listtopos(*args): + """Syntax: [storm list] + + List the running topologies and their statuses. + """ + exec_storm_class( + "backtype.storm.command.list", + args=args, + jvmtype="-client", + extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) + +def deactivate(*args): + """Syntax: [storm deactivate topology-name] + + Deactivates the specified topology's spouts. + """ + exec_storm_class( + "backtype.storm.command.deactivate", + args=args, + jvmtype="-client", + extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) + +def rebalance(*args): + """Syntax: [storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*] + + Sometimes you may wish to spread out where the workers for a topology + are running. For example, let's say you have a 10 node cluster running + 4 workers per node, and then let's say you add another 10 nodes to + the cluster. You may wish to have Storm spread out the workers for the + running topology so that each node runs 2 workers. One way to do this + is to kill the topology and resubmit it, but Storm provides a "rebalance" + command that provides an easier way to do this. + + Rebalance will first deactivate the topology for the duration of the + message timeout (overridable with the -w flag) and then redistribute + the workers evenly around the cluster. The topology will then return to + its previous state of activation (so a deactivated topology will still + be deactivated and an activated topology will go back to being activated). + + The rebalance command can also be used to change the parallelism of a running topology. + Use the -n and -e switches to change the number of workers or number of executors of a component + respectively. + """ + exec_storm_class( + "backtype.storm.command.rebalance", + args=args, + jvmtype="-client", + extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) + +def shell(resourcesdir, command, *args): + tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar" + os.system("jar cf %s %s" % (tmpjarpath, resourcesdir)) + runnerargs = [tmpjarpath, command] + runnerargs.extend(args) + exec_storm_class( + "backtype.storm.command.shell_submission", + args=runnerargs, + jvmtype="-client", + extrajars=[USER_CONF_DIR], + fork=True) + os.system("rm " + tmpjarpath) + +def repl(): + """Syntax: [storm repl] + + Opens up a Clojure REPL with the storm jars and configuration + on the classpath. Useful for debugging. + """ + cppaths = [CLUSTER_CONF_DIR] + exec_storm_class("clojure.main", jvmtype="-client", extrajars=cppaths) + +def get_logback_conf_dir(): + cppaths = [CLUSTER_CONF_DIR] + storm_logback_conf_dir = confvalue("storm.logback.conf.dir", cppaths) + if(storm_logback_conf_dir == None or storm_logback_conf_dir == "nil"): + storm_logback_conf_dir = STORM_LOGBACK_CONF_DIR + return storm_logback_conf_dir + +def nimbus(klass="backtype.storm.daemon.nimbus"): + """Syntax: [storm nimbus] + + Launches the nimbus daemon. This command should be run under + supervision with a tool like daemontools or monit. + + See Setting up a Storm cluster for more information. + (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster) + """ + cppaths = [CLUSTER_CONF_DIR] + jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [ + "-Dlogfile.name=nimbus.log", + "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), "cluster.xml"), + ] + exec_storm_class( + klass, + jvmtype="-server", + extrajars=cppaths, + jvmopts=jvmopts) + +def supervisor(klass="backtype.storm.daemon.supervisor"): + """Syntax: [storm supervisor] + + Launches the supervisor daemon. This command should be run + under supervision with a tool like daemontools or monit. + + See Setting up a Storm cluster for more information. + (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster) + """ + cppaths = [CLUSTER_CONF_DIR] + jvmopts = parse_args(confvalue("supervisor.childopts", cppaths)) + [ + "-Dlogfile.name=supervisor.log", + "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), "cluster.xml"), + ] + exec_storm_class( + klass, + jvmtype="-server", + extrajars=cppaths, + jvmopts=jvmopts) + +def ui(): + """Syntax: [storm ui] + + Launches the UI daemon. The UI provides a web interface for a Storm + cluster and shows detailed stats about running topologies. This command + should be run under supervision with a tool like daemontools or monit. + + See Setting up a Storm cluster for more information. + (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster) + """ + cppaths = [CLUSTER_CONF_DIR] + jvmopts = parse_args(confvalue("ui.childopts", cppaths)) + [ + "-Dlogfile.name=ui.log", + "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), "cluster.xml") + ] + exec_storm_class( + "backtype.storm.ui.core", + jvmtype="-server", + jvmopts=jvmopts, + extrajars=[STORM_DIR, CLUSTER_CONF_DIR]) + +def logviewer(): + """Syntax: [storm logviewer] + + Launches the log viewer daemon. It provides a web interface for viewing + storm log files. This command should be run under supervision with a + tool like daemontools or monit. + + See Setting up a Storm cluster for more information. + (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster) + """ + cppaths = [CLUSTER_CONF_DIR] + jvmopts = parse_args(confvalue("logviewer.childopts", cppaths)) + [ + "-Dlogfile.name=logviewer.log", + "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), "cluster.xml") + ] + exec_storm_class( + "backtype.storm.daemon.logviewer", + jvmtype="-server", + jvmopts=jvmopts, + extrajars=[STORM_DIR, CLUSTER_CONF_DIR]) + +def drpc(): + """Syntax: [storm drpc] + + Launches a DRPC daemon. This command should be run under supervision + with a tool like daemontools or monit. + + See Distributed RPC for more information. + (http://storm.incubator.apache.org/documentation/Distributed-RPC) + """ + cppaths = [CLUSTER_CONF_DIR] + jvmopts = parse_args(confvalue("drpc.childopts", cppaths)) + [ + "-Dlogfile.name=drpc.log", + "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), "cluster.xml") + ] + exec_storm_class( + "backtype.storm.daemon.drpc", + jvmtype="-server", + jvmopts=jvmopts, + extrajars=[CLUSTER_CONF_DIR]) + +def dev_zookeeper(): + """Syntax: [storm dev-zookeeper] + + Launches a fresh Zookeeper server using "dev.zookeeper.path" as its local dir and + "storm.zookeeper.port" as its port. This is only intended for development/testing, the + Zookeeper instance launched is not configured to be used in production. + """ + cppaths = [CLUSTER_CONF_DIR] + exec_storm_class( + "backtype.storm.command.dev_zookeeper", + jvmtype="-server", + extrajars=[CLUSTER_CONF_DIR]) + +def version(): + """Syntax: [storm version] + + Prints the version number of this Storm release. + """ + cppaths = [CLUSTER_CONF_DIR] + exec_storm_class( + "backtype.storm.utils.VersionInfo", + jvmtype="-client", + extrajars=[CLUSTER_CONF_DIR]) + +def print_classpath(): + """Syntax: [storm classpath] + + Prints the classpath used by the storm client when running commands. + """ + print(get_classpath([])) + +def monitor(*args): + """Syntax: [storm monitor topology-name [-i interval-secs] [-m component-id] [-s stream-id] [-w [emitted | transferred]]] + + Monitor given topology's throughput interactively. + One can specify poll-interval, component-id, stream-id, watch-item[emitted | transferred] + By default, + poll-interval is 4 seconds; + all component-ids will be list; + stream-id is 'default'; + watch-item is 'emitted'; + """ + exec_storm_class( + "backtype.storm.command.monitor", + args=args, + jvmtype="-client", + extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) + + +def print_commands(): + """Print all client commands and link to documentation""" + print("Commands:\n\t" + "\n\t".join(sorted(COMMANDS.keys()))) + print("\nHelp: \n\thelp \n\thelp <command>") + print("\nDocumentation for the storm client can be found at http://storm.incubator.apache.org/documentation/Command-line-client.html\n") + print("Configs can be overridden using one or more -c flags, e.g. \"storm list -c nimbus.host=nimbus.mycompany.com\"\n") + +def print_usage(command=None): + """Print one help message or list of available commands""" + if command != None: + if command in COMMANDS: + print(COMMANDS[command].__doc__ or + "No documentation provided for <%s>" % command) + else: + print("<%s> is not a valid command" % command) + else: + print_commands() + +def unknown_command(*args): + print("Unknown command: [storm %s]" % ' '.join(sys.argv[1:])) + print_usage() + sys.exit(254) + +COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui, "logviewer": logviewer, + "drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue, + "remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath, + "activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage, + "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, "monitor": monitor, + "upload-credentials": upload_credentials} + +def parse_config(config_list): + global CONFIG_OPTS + if len(config_list) > 0: + for config in config_list: + CONFIG_OPTS.append(config) + +def parse_config_opts(args): + curr = args[:] + curr.reverse() + config_list = [] + args_list = [] + + while len(curr) > 0: + token = curr.pop() + if token == "-c": + config_list.append(curr.pop()) + elif token == "--config": + global CONFFILE + CONFFILE = curr.pop() + else: + args_list.append(token) + + return config_list, args_list + +def main(): + if len(sys.argv) <= 1: + print_usage() + sys.exit(-1) + global CONFIG_OPTS + config_list, args = parse_config_opts(sys.argv[1:]) + parse_config(config_list) + COMMAND = args[0] + ARGS = args[1:] + (COMMANDS.get(COMMAND, unknown_command))(*ARGS) + +if __name__ == "__main__": + main() http://git-wip-us.apache.org/repos/asf/storm/blob/a4a8f7d2/conf/storm-env.sh ---------------------------------------------------------------------- diff --git a/conf/storm-env.sh b/conf/storm-env.sh new file mode 100644 index 0000000..f00525c --- /dev/null +++ b/conf/storm-env.sh @@ -0,0 +1,26 @@ +#!/bin/bash +# +# Copyright 2014 The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Set Storm specific environment variables here. + +# The java implementation to use. +export JAVA_HOME=${JAVA_HOME} + +# export STORM_CONF_DIR=""