Repository: storm
Updated Branches:
  refs/heads/master 5eff2e761 -> 07098e8eb


STORM-675. Allow users to have storm-env.sh under config dir to set custom 
JAVA_HOME and other env variables.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a4a8f7d2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a4a8f7d2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a4a8f7d2

Branch: refs/heads/master
Commit: a4a8f7d22bf79a04d4cddd067f2a63766084ebfe
Parents: 5ca7e80
Author: Sriharsha Chintalapani <m...@harsha.io>
Authored: Tue Feb 17 17:11:50 2015 -0800
Committer: Sriharsha Chintalapani <m...@harsha.io>
Committed: Tue Feb 17 17:11:50 2015 -0800

----------------------------------------------------------------------
 bin/storm         | 586 ++++++-------------------------------------------
 bin/storm.py      | 543 +++++++++++++++++++++++++++++++++++++++++++++
 conf/storm-env.sh |  26 +++
 3 files changed, 632 insertions(+), 523 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a4a8f7d2/bin/storm
----------------------------------------------------------------------
diff --git a/bin/storm b/bin/storm
index 410a1b0..fffd839 100755
--- a/bin/storm
+++ b/bin/storm
@@ -1,5 +1,7 @@
-#!/usr/bin/python
-
+#!/bin/bash
+#
+# Copyright 2014 The Apache Software Foundation
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -8,531 +10,69 @@
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
 #
-# http://www.apache.org/licenses/LICENSE-2.0
+#     http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+#
 
-import os
-import sys
-import random
-import subprocess as sub
-import re
-import shlex
-try:
-    # python 3
-    from urllib.parse import quote_plus
-except ImportError:
-    # python 2
-    from urllib import quote_plus
-try:
-    # python 3
-    import configparser
-except ImportError:
-    # python 2
-    import ConfigParser as configparser
-
-def identity(x):
-    return x
-
-def cygpath(x):
-    command = ["cygpath", "-wp", x]
-    p = sub.Popen(command,stdout=sub.PIPE)
-    output, errors = p.communicate()
-    lines = output.split(os.linesep)
-    return lines[0]
-
-def init_storm_env():
-    global CLUSTER_CONF_DIR
-    ini_file = os.path.join(CLUSTER_CONF_DIR, 'storm_env.ini')
-    if not os.path.isfile(ini_file):
-        return
-    config = configparser.ConfigParser()
-    config.optionxform = str
-    config.read(ini_file)
-    options = config.options('environment')
-    for option in options:
-        value = config.get('environment', option)
-        os.environ[option] = value
-
-normclasspath = cygpath if sys.platform == 'cygwin' else identity
-STORM_DIR = os.sep.join(os.path.realpath( __file__ ).split(os.sep)[:-2])
-USER_CONF_DIR = os.path.expanduser("~" + os.sep + ".storm")
-STORM_CONF_DIR = os.getenv('STORM_CONF_DIR', None)
-
-if STORM_CONF_DIR == None:
-    CLUSTER_CONF_DIR = os.path.join(STORM_DIR, "conf")
-else:
-    CLUSTER_CONF_DIR = STORM_CONF_DIR
-
-if (not os.path.isfile(os.path.join(USER_CONF_DIR, "storm.yaml"))):
-    USER_CONF_DIR = CLUSTER_CONF_DIR
-
-STORM_LIB_DIR = os.path.join(STORM_DIR, "lib")
-STORM_BIN_DIR = os.path.join(STORM_DIR, "bin")
-STORM_LOGBACK_CONF_DIR = os.path.join(STORM_DIR, "logback")
-
-init_storm_env()
-
-CONFIG_OPTS = []
-CONFFILE = ""
-JAR_JVM_OPTS = shlex.split(os.getenv('STORM_JAR_JVM_OPTS', ''))
-JAVA_HOME = os.getenv('JAVA_HOME', None)
-JAVA_CMD = 'java' if not JAVA_HOME else os.path.join(JAVA_HOME, 'bin', 'java')
-
-def get_config_opts():
-    global CONFIG_OPTS
-    return "-Dstorm.options=" + ','.join(map(quote_plus,CONFIG_OPTS))
-
-if not os.path.exists(STORM_LIB_DIR):
-    print("******************************************")
-    print("The storm client can only be run from within a release. You appear 
to be trying to run the client from a checkout of Storm's source code.")
-    print("\nYou can download a Storm release at 
http://storm-project.net/downloads.html";)
-    print("******************************************")
-    sys.exit(1)
-
-def get_jars_full(adir):
-    files = os.listdir(adir)
-    ret = []
-    for f in files:
-        if f.endswith(".jar"):
-            ret.append(os.path.join(adir, f))
-    return ret
-
-def get_classpath(extrajars):
-    ret = get_jars_full(STORM_DIR)
-    ret.extend(get_jars_full(STORM_LIB_DIR))
-    ret.extend(extrajars)
-    return normclasspath(os.pathsep.join(ret))
-
-def confvalue(name, extrapaths):
-    global CONFFILE
-    command = [
-        JAVA_CMD, "-client", get_config_opts(), "-Dstorm.conf.file=" + 
CONFFILE,
-        "-cp", get_classpath(extrapaths), 
"backtype.storm.command.config_value", name
-    ]
-    p = sub.Popen(command, stdout=sub.PIPE)
-    output, errors = p.communicate()
-    # python 3
-    if not isinstance(output, str):
-        output = output.decode('utf-8')
-    lines = output.split(os.linesep)
-    for line in lines:
-        tokens = line.split(" ")
-        if tokens[0] == "VALUE:":
-            return " ".join(tokens[1:])
-    return ""
-
-def print_localconfvalue(name):
-    """Syntax: [storm localconfvalue conf-name]
-
-    Prints out the value for conf-name in the local Storm configs. 
-    The local Storm configs are the ones in ~/.storm/storm.yaml merged 
-    in with the configs in defaults.yaml.
-    """
-    print(name + ": " + confvalue(name, [USER_CONF_DIR]))
-
-def print_remoteconfvalue(name):
-    """Syntax: [storm remoteconfvalue conf-name]
-
-    Prints out the value for conf-name in the cluster's Storm configs. 
-    The cluster's Storm configs are the ones in $STORM-PATH/conf/storm.yaml 
-    merged in with the configs in defaults.yaml. 
-
-    This command must be run on a cluster machine.
-    """
-    print(name + ": " + confvalue(name, [CLUSTER_CONF_DIR]))
-
-def parse_args(string):
-    r"""Takes a string of whitespace-separated tokens and parses it into a 
list.
-    Whitespace inside tokens may be quoted with single quotes, double quotes or
-    backslash (similar to command-line arguments in bash).
-
-    >>> parse_args(r'''"a a" 'b b' c\ c "d'd" 'e"e' 'f\'f' "g\"g" "i""i" 
'j''j' k" "k l' l' mm n\\n''')
-    ['a a', 'b b', 'c c', "d'd", 'e"e', "f'f", 'g"g', 'ii', 'jj', 'k k', 'l 
l', 'mm', r'n\n']
-    """
-    re_split = re.compile(r'''((?:
-        [^\s"'\\] |
-        "(?: [^"\\] | \\.)*" |
-        '(?: [^'\\] | \\.)*' |
-        \\.
-    )+)''', re.VERBOSE)
-    args = re_split.split(string)[1::2]
-    args = [re.compile(r'"((?:[^"\\]|\\.)*)"').sub('\\1', x) for x in args]
-    args = [re.compile(r"'((?:[^'\\]|\\.)*)'").sub('\\1', x) for x in args]
-    return [re.compile(r'\\(.)').sub('\\1', x) for x in args]
-
-def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], 
args=[], fork=False):
-    global CONFFILE
-    storm_log_dir = confvalue("storm.log.dir",[CLUSTER_CONF_DIR])
-    if(storm_log_dir == None or storm_log_dir == "nil"):
-        storm_log_dir = os.path.join(STORM_DIR, "logs")
-    all_args = [
-        JAVA_CMD, jvmtype, get_config_opts(),
-        "-Dstorm.home=" + STORM_DIR,
-        "-Dstorm.log.dir=" + storm_log_dir, 
-        "-Djava.library.path=" + confvalue("java.library.path", extrajars),
-        "-Dstorm.conf.file=" + CONFFILE,
-        "-cp", get_classpath(extrajars),
-    ] + jvmopts + [klass] + list(args)
-    print("Running: " + " ".join(all_args))
-    if fork:
-        os.spawnvp(os.P_WAIT, JAVA_CMD, all_args)
-    else:
-        # handling whitespaces in JAVA_CMD
-        sub.call(all_args)
-
-def jar(jarfile, klass, *args):
-    """Syntax: [storm jar topology-jar-path class ...]
-
-    Runs the main method of class with the specified arguments. 
-    The storm jars and configs in ~/.storm are put on the classpath. 
-    The process is configured so that StormSubmitter 
-    
(http://storm.incubator.apache.org/apidocs/backtype/storm/StormSubmitter.html)
-    will upload the jar at topology-jar-path when the topology is submitted.
-    """
-    exec_storm_class(
-        klass,
-        jvmtype="-client",
-        extrajars=[jarfile, USER_CONF_DIR, STORM_BIN_DIR],
-        args=args,
-        jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + jarfile])
-
-def kill(*args):
-    """Syntax: [storm kill topology-name [-w wait-time-secs]]
-
-    Kills the topology with the name topology-name. Storm will 
-    first deactivate the topology's spouts for the duration of 
-    the topology's message timeout to allow all messages currently 
-    being processed to finish processing. Storm will then shutdown 
-    the workers and clean up their state. You can override the length 
-    of time Storm waits between deactivation and shutdown with the -w flag.
-    """
-    exec_storm_class(
-        "backtype.storm.command.kill_topology", 
-        args=args, 
-        jvmtype="-client", 
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
-
-
-def upload_credentials(*args):
-    """Syntax: [storm upload_credentials topology-name [credkey credvalue]*]
-
-    Uploads a new set of credentials to a running topology
-    """
-    exec_storm_class(
-        "backtype.storm.command.upload_credentials", 
-        args=args, 
-        jvmtype="-client", 
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
-
-def activate(*args):
-    """Syntax: [storm activate topology-name]
-
-    Activates the specified topology's spouts.
-    """
-    exec_storm_class(
-        "backtype.storm.command.activate", 
-        args=args, 
-        jvmtype="-client", 
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
-
-def listtopos(*args):
-    """Syntax: [storm list]
-
-    List the running topologies and their statuses.
-    """
-    exec_storm_class(
-        "backtype.storm.command.list", 
-        args=args, 
-        jvmtype="-client", 
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
-
-def deactivate(*args):
-    """Syntax: [storm deactivate topology-name]
-
-    Deactivates the specified topology's spouts.
-    """
-    exec_storm_class(
-        "backtype.storm.command.deactivate", 
-        args=args, 
-        jvmtype="-client", 
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
-
-def rebalance(*args):
-    """Syntax: [storm rebalance topology-name [-w wait-time-secs] [-n 
new-num-workers] [-e component=parallelism]*]
-
-    Sometimes you may wish to spread out where the workers for a topology 
-    are running. For example, let's say you have a 10 node cluster running 
-    4 workers per node, and then let's say you add another 10 nodes to 
-    the cluster. You may wish to have Storm spread out the workers for the 
-    running topology so that each node runs 2 workers. One way to do this 
-    is to kill the topology and resubmit it, but Storm provides a "rebalance" 
-    command that provides an easier way to do this.
-
-    Rebalance will first deactivate the topology for the duration of the 
-    message timeout (overridable with the -w flag) and then redistribute 
-    the workers evenly around the cluster. The topology will then return to 
-    its previous state of activation (so a deactivated topology will still 
-    be deactivated and an activated topology will go back to being activated).
-    
-    The rebalance command can also be used to change the parallelism of a 
running topology.
-    Use the -n and -e switches to change the number of workers or number of 
executors of a component
-    respectively.
-    """
-    exec_storm_class(
-        "backtype.storm.command.rebalance", 
-        args=args, 
-        jvmtype="-client", 
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
-
-def shell(resourcesdir, command, *args):
-    tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar"
-    os.system("jar cf %s %s" % (tmpjarpath, resourcesdir))
-    runnerargs = [tmpjarpath, command]
-    runnerargs.extend(args)
-    exec_storm_class(
-        "backtype.storm.command.shell_submission", 
-        args=runnerargs, 
-        jvmtype="-client", 
-        extrajars=[USER_CONF_DIR],
-        fork=True)
-    os.system("rm " + tmpjarpath)
-
-def repl():
-    """Syntax: [storm repl]
-
-    Opens up a Clojure REPL with the storm jars and configuration 
-    on the classpath. Useful for debugging.
-    """
-    cppaths = [CLUSTER_CONF_DIR]
-    exec_storm_class("clojure.main", jvmtype="-client", extrajars=cppaths)
-
-def get_logback_conf_dir():
-    cppaths = [CLUSTER_CONF_DIR]
-    storm_logback_conf_dir = confvalue("storm.logback.conf.dir", cppaths)
-    if(storm_logback_conf_dir == None or storm_logback_conf_dir == "nil"):
-        storm_logback_conf_dir = STORM_LOGBACK_CONF_DIR
-    return storm_logback_conf_dir
-
-def nimbus(klass="backtype.storm.daemon.nimbus"):
-    """Syntax: [storm nimbus]
-
-    Launches the nimbus daemon. This command should be run under 
-    supervision with a tool like daemontools or monit. 
-
-    See Setting up a Storm cluster for more information.
-    
(http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster)
-    """
-    cppaths = [CLUSTER_CONF_DIR]
-    jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [
-        "-Dlogfile.name=nimbus.log",
-        "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), 
"cluster.xml"),
-    ]
-    exec_storm_class(
-        klass, 
-        jvmtype="-server", 
-        extrajars=cppaths, 
-        jvmopts=jvmopts)
-
-def supervisor(klass="backtype.storm.daemon.supervisor"):
-    """Syntax: [storm supervisor]
-
-    Launches the supervisor daemon. This command should be run 
-    under supervision with a tool like daemontools or monit. 
-
-    See Setting up a Storm cluster for more information.
-    
(http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster)
-    """
-    cppaths = [CLUSTER_CONF_DIR]
-    jvmopts = parse_args(confvalue("supervisor.childopts", cppaths)) + [
-        "-Dlogfile.name=supervisor.log",
-        "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), 
"cluster.xml"),
-    ]
-    exec_storm_class(
-        klass, 
-        jvmtype="-server", 
-        extrajars=cppaths, 
-        jvmopts=jvmopts)
-
-def ui():
-    """Syntax: [storm ui]
-
-    Launches the UI daemon. The UI provides a web interface for a Storm 
-    cluster and shows detailed stats about running topologies. This command 
-    should be run under supervision with a tool like daemontools or monit. 
-
-    See Setting up a Storm cluster for more information.
-    
(http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster)
-    """
-    cppaths = [CLUSTER_CONF_DIR]
-    jvmopts = parse_args(confvalue("ui.childopts", cppaths)) + [
-        "-Dlogfile.name=ui.log",
-        "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), 
"cluster.xml")
-    ]
-    exec_storm_class(
-        "backtype.storm.ui.core", 
-        jvmtype="-server", 
-        jvmopts=jvmopts, 
-        extrajars=[STORM_DIR, CLUSTER_CONF_DIR])
-
-def logviewer():
-    """Syntax: [storm logviewer]
-
-    Launches the log viewer daemon. It provides a web interface for viewing
-    storm log files. This command should be run under supervision with a 
-    tool like daemontools or monit. 
-
-    See Setting up a Storm cluster for more information.
-    
(http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster)
-    """
-    cppaths = [CLUSTER_CONF_DIR]
-    jvmopts = parse_args(confvalue("logviewer.childopts", cppaths)) + [
-        "-Dlogfile.name=logviewer.log",
-        "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), 
"cluster.xml")
-    ]
-    exec_storm_class(
-        "backtype.storm.daemon.logviewer", 
-        jvmtype="-server", 
-        jvmopts=jvmopts, 
-        extrajars=[STORM_DIR, CLUSTER_CONF_DIR])
-
-def drpc():
-    """Syntax: [storm drpc]
-
-    Launches a DRPC daemon. This command should be run under supervision 
-    with a tool like daemontools or monit. 
-
-    See Distributed RPC for more information.
-    (http://storm.incubator.apache.org/documentation/Distributed-RPC)
-    """
-    cppaths = [CLUSTER_CONF_DIR]
-    jvmopts = parse_args(confvalue("drpc.childopts", cppaths)) + [
-        "-Dlogfile.name=drpc.log",
-        "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), 
"cluster.xml")
-    ]
-    exec_storm_class(
-        "backtype.storm.daemon.drpc", 
-        jvmtype="-server", 
-        jvmopts=jvmopts, 
-        extrajars=[CLUSTER_CONF_DIR])
-
-def dev_zookeeper():
-    """Syntax: [storm dev-zookeeper]
-  
-    Launches a fresh Zookeeper server using "dev.zookeeper.path" as its local 
dir and
-    "storm.zookeeper.port" as its port. This is only intended for 
development/testing, the
-    Zookeeper instance launched is not configured to be used in production.
-    """
-    cppaths = [CLUSTER_CONF_DIR]
-    exec_storm_class(
-        "backtype.storm.command.dev_zookeeper", 
-        jvmtype="-server", 
-        extrajars=[CLUSTER_CONF_DIR])
-
-def version():
-  """Syntax: [storm version]
-
-  Prints the version number of this Storm release.
-  """
-  cppaths = [CLUSTER_CONF_DIR]
-  exec_storm_class(
-       "backtype.storm.utils.VersionInfo",
-       jvmtype="-client",
-       extrajars=[CLUSTER_CONF_DIR])
-
-def print_classpath():
-    """Syntax: [storm classpath]
-
-    Prints the classpath used by the storm client when running commands.
-    """
-    print(get_classpath([]))
-
-def monitor(*args):
-    """Syntax: [storm monitor topology-name [-i interval-secs] [-m 
component-id] [-s stream-id] [-w [emitted | transferred]]]
-
-    Monitor given topology's throughput interactively.
-    One can specify poll-interval, component-id, stream-id, watch-item[emitted 
| transferred]
-    By default,
-        poll-interval is 4 seconds;
-        all component-ids will be list;
-        stream-id is 'default';
-        watch-item is 'emitted';
-    """
-    exec_storm_class(
-        "backtype.storm.command.monitor",
-        args=args,
-        jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
-
-
-def print_commands():
-    """Print all client commands and link to documentation"""
-    print("Commands:\n\t" +  "\n\t".join(sorted(COMMANDS.keys())))
-    print("\nHelp: \n\thelp \n\thelp <command>")
-    print("\nDocumentation for the storm client can be found at 
http://storm.incubator.apache.org/documentation/Command-line-client.html\n";)
-    print("Configs can be overridden using one or more -c flags, e.g. \"storm 
list -c nimbus.host=nimbus.mycompany.com\"\n")
-
-def print_usage(command=None):
-    """Print one help message or list of available commands"""
-    if command != None:
-        if command in COMMANDS:
-            print(COMMANDS[command].__doc__ or
-                  "No documentation provided for <%s>" % command)
-        else:
-           print("<%s> is not a valid command" % command)
-    else:
-        print_commands()
-
-def unknown_command(*args):
-    print("Unknown command: [storm %s]" % ' '.join(sys.argv[1:]))
-    print_usage()
-    sys.exit(254)
-
-COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": 
ui, "logviewer": logviewer,
-            "drpc": drpc, "supervisor": supervisor, "localconfvalue": 
print_localconfvalue,
-            "remoteconfvalue": print_remoteconfvalue, "repl": repl, 
"classpath": print_classpath,
-            "activate": activate, "deactivate": deactivate, "rebalance": 
rebalance, "help": print_usage,
-            "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": 
version, "monitor": monitor,
-            "upload-credentials": upload_credentials}
-
-def parse_config(config_list):
-    global CONFIG_OPTS
-    if len(config_list) > 0:
-        for config in config_list:
-            CONFIG_OPTS.append(config)
-
-def parse_config_opts(args):
-  curr = args[:]
-  curr.reverse()
-  config_list = []
-  args_list = []
-  
-  while len(curr) > 0:
-    token = curr.pop()
-    if token == "-c":
-      config_list.append(curr.pop())
-    elif token == "--config":
-      global CONFFILE
-      CONFFILE = curr.pop()
-    else:
-      args_list.append(token)
-  
-  return config_list, args_list
-    
-def main():
-    if len(sys.argv) <= 1:
-        print_usage()
-        sys.exit(-1)
-    global CONFIG_OPTS
-    config_list, args = parse_config_opts(sys.argv[1:])
-    parse_config(config_list)
-    COMMAND = args[0]
-    ARGS = args[1:]
-    (COMMANDS.get(COMMAND, unknown_command))(*ARGS)
-    
-if __name__ == "__main__":
-    main()
+# Resolve links - $0 may be a softlink
+PRG="${0}"
+
+while [ -h "${PRG}" ]; do
+  ls=`ls -ld "${PRG}"`
+  link=`expr "$ls" : '.*-> \(.*\)$'`
+  if expr "$link" : '/.*' > /dev/null; then
+    PRG="$link"
+  else
+    PRG=`dirname "${PRG}"`/"$link"
+  fi
+done
+
+# find python >= 2.6
+if [ -a /usr/bin/python2.6 ]; then
+  PYTHON=/usr/bin/python2.6
+fi
+
+if [ -z "$PYTHON" ]; then
+  PYTHON=/usr/bin/python
+fi
+
+# check for version
+majversion=`$PYTHON -V 2>&1 | awk '{print $2}' | cut -d'.' -f1`
+minversion=`$PYTHON -V 2>&1 | awk '{print $2}' | cut -d'.' -f2`
+numversion=$(( 10 * $majversion + $minversion))
+if (( $numversion < 26 )); then
+  echo "Need python version > 2.6"
+  exit 1
+fi
+
+STORM_BIN_DIR=`dirname ${PRG}`
+export STORM_BASE_DIR=`cd ${STORM_BIN_DIR}/..;pwd`
+
+#check to see if the conf dir is given as an optional argument
+if [ $# -gt 1 ]
+then
+    if [ "--config" = "$1" ]
+          then
+              conf_file=$2
+              if [ ! -f "$conf_file" ]; then
+                echo "Error: Cannot find configuration directory: $conf_file"
+                exit 1
+             fi
+              STORM_CONF_FILE=$conf_file
+              STORM_CONF_DIR=`dirname $conf_file`
+    fi
+fi
+
+export STORM_CONF_DIR="${STORM_CONF_DIR:-$STORM_BASE_DIR/conf}"
+export STORM_CONF_FILE="${STORM_CONF_FILE:-$STORM_BASE_DIR/conf/storm.yaml}"
+
+if [ -f "${STORM_CONF_DIR}/storm-env.sh" ]; then
+  . "${STORM_CONF_DIR}/storm-env.sh"
+fi
+
+exec $PYTHON ${STORM_BIN_DIR}/storm.py $@

http://git-wip-us.apache.org/repos/asf/storm/blob/a4a8f7d2/bin/storm.py
----------------------------------------------------------------------
diff --git a/bin/storm.py b/bin/storm.py
new file mode 100755
index 0000000..2cf6c32
--- /dev/null
+++ b/bin/storm.py
@@ -0,0 +1,543 @@
+#!/usr/bin/python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+import random
+import subprocess as sub
+import re
+import shlex
+try:
+    # python 3
+    from urllib.parse import quote_plus
+except ImportError:
+    # python 2
+    from urllib import quote_plus
+try:
+    # python 3
+    import configparser
+except ImportError:
+    # python 2
+    import ConfigParser as configparser
+
+def is_windows():
+    return sys.platform.startswith('win')
+
+def identity(x):
+    return x
+
+def cygpath(x):
+    command = ["cygpath", "-wp", x]
+    p = sub.Popen(command,stdout=sub.PIPE)
+    output, errors = p.communicate()
+    lines = output.split(os.linesep)
+    return lines[0]
+
+def init_storm_env():
+    global CLUSTER_CONF_DIR
+    ini_file = os.path.join(CLUSTER_CONF_DIR, 'storm_env.ini')
+    if not os.path.isfile(ini_file):
+        return
+    config = configparser.ConfigParser()
+    config.optionxform = str
+    config.read(ini_file)
+    options = config.options('environment')
+    for option in options:
+        value = config.get('environment', option)
+        os.environ[option] = value
+
+normclasspath = cygpath if sys.platform == 'cygwin' else identity
+STORM_DIR = os.sep.join(os.path.realpath( __file__ ).split(os.sep)[:-2])
+USER_CONF_DIR = os.path.expanduser("~" + os.sep + ".storm")
+STORM_CONF_DIR = os.getenv('STORM_CONF_DIR', None)
+
+if STORM_CONF_DIR == None:
+    CLUSTER_CONF_DIR = os.path.join(STORM_DIR, "conf")
+else:
+    CLUSTER_CONF_DIR = STORM_CONF_DIR
+
+if (not os.path.isfile(os.path.join(USER_CONF_DIR, "storm.yaml"))):
+    USER_CONF_DIR = CLUSTER_CONF_DIR
+
+STORM_LIB_DIR = os.path.join(STORM_DIR, "lib")
+STORM_BIN_DIR = os.path.join(STORM_DIR, "bin")
+STORM_LOGBACK_CONF_DIR = os.path.join(STORM_DIR, "logback")
+
+init_storm_env()
+
+CONFIG_OPTS = []
+CONFFILE = ""
+JAR_JVM_OPTS = shlex.split(os.getenv('STORM_JAR_JVM_OPTS', ''))
+JAVA_HOME = os.getenv('JAVA_HOME', None)
+JAVA_CMD = 'java' if not JAVA_HOME else os.path.join(JAVA_HOME, 'bin', 'java')
+
+def get_config_opts():
+    global CONFIG_OPTS
+    return "-Dstorm.options=" + ','.join(map(quote_plus,CONFIG_OPTS))
+
+if not os.path.exists(STORM_LIB_DIR):
+    print("******************************************")
+    print("The storm client can only be run from within a release. You appear 
to be trying to run the client from a checkout of Storm's source code.")
+    print("\nYou can download a Storm release at 
http://storm-project.net/downloads.html";)
+    print("******************************************")
+    sys.exit(1)
+
+def get_jars_full(adir):
+    files = os.listdir(adir)
+    ret = []
+    for f in files:
+        if f.endswith(".jar"):
+            ret.append(os.path.join(adir, f))
+    return ret
+
+def get_classpath(extrajars):
+    ret = get_jars_full(STORM_DIR)
+    ret.extend(get_jars_full(STORM_LIB_DIR))
+    ret.extend(extrajars)
+    return normclasspath(os.pathsep.join(ret))
+
+def confvalue(name, extrapaths):
+    global CONFFILE
+    command = [
+        JAVA_CMD, "-client", get_config_opts(), "-Dstorm.conf.file=" + 
CONFFILE,
+        "-cp", get_classpath(extrapaths), 
"backtype.storm.command.config_value", name
+    ]
+    p = sub.Popen(command, stdout=sub.PIPE)
+    output, errors = p.communicate()
+    # python 3
+    if not isinstance(output, str):
+        output = output.decode('utf-8')
+    lines = output.split(os.linesep)
+    for line in lines:
+        tokens = line.split(" ")
+        if tokens[0] == "VALUE:":
+            return " ".join(tokens[1:])
+    return ""
+
+def print_localconfvalue(name):
+    """Syntax: [storm localconfvalue conf-name]
+
+    Prints out the value for conf-name in the local Storm configs.
+    The local Storm configs are the ones in ~/.storm/storm.yaml merged
+    in with the configs in defaults.yaml.
+    """
+    print(name + ": " + confvalue(name, [USER_CONF_DIR]))
+
+def print_remoteconfvalue(name):
+    """Syntax: [storm remoteconfvalue conf-name]
+
+    Prints out the value for conf-name in the cluster's Storm configs.
+    The cluster's Storm configs are the ones in $STORM-PATH/conf/storm.yaml
+    merged in with the configs in defaults.yaml.
+
+    This command must be run on a cluster machine.
+    """
+    print(name + ": " + confvalue(name, [CLUSTER_CONF_DIR]))
+
+def parse_args(string):
+    r"""Takes a string of whitespace-separated tokens and parses it into a 
list.
+    Whitespace inside tokens may be quoted with single quotes, double quotes or
+    backslash (similar to command-line arguments in bash).
+
+    >>> parse_args(r'''"a a" 'b b' c\ c "d'd" 'e"e' 'f\'f' "g\"g" "i""i" 
'j''j' k" "k l' l' mm n\\n''')
+    ['a a', 'b b', 'c c', "d'd", 'e"e', "f'f", 'g"g', 'ii', 'jj', 'k k', 'l 
l', 'mm', r'n\n']
+    """
+    re_split = re.compile(r'''((?:
+        [^\s"'\\] |
+        "(?: [^"\\] | \\.)*" |
+        '(?: [^'\\] | \\.)*' |
+        \\.
+    )+)''', re.VERBOSE)
+    args = re_split.split(string)[1::2]
+    args = [re.compile(r'"((?:[^"\\]|\\.)*)"').sub('\\1', x) for x in args]
+    args = [re.compile(r"'((?:[^'\\]|\\.)*)'").sub('\\1', x) for x in args]
+    return [re.compile(r'\\(.)').sub('\\1', x) for x in args]
+
+def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], 
args=[], fork=False):
+    global CONFFILE
+    storm_log_dir = confvalue("storm.log.dir",[CLUSTER_CONF_DIR])
+    if(storm_log_dir == None or storm_log_dir == "nil"):
+        storm_log_dir = os.path.join(STORM_DIR, "logs")
+    all_args = [
+        "java", jvmtype, get_config_opts(),
+        "-Dstorm.home=" + STORM_DIR,
+        "-Dstorm.log.dir=" + storm_log_dir,
+        "-Djava.library.path=" + confvalue("java.library.path", extrajars),
+        "-Dstorm.conf.file=" + CONFFILE,
+        "-cp", get_classpath(extrajars),
+    ] + jvmopts + [klass] + list(args)
+    print("Running: " + " ".join(all_args))
+    if fork:
+        os.spawnvp(os.P_WAIT, JAVA_CMD, all_args)
+    elif is_windows():
+        # handling whitespaces in JAVA_CMD
+        sub.call(all_args)
+    else:
+        os.execvp(JAVA_CMD, all_args)
+
+def jar(jarfile, klass, *args):
+    """Syntax: [storm jar topology-jar-path class ...]
+
+    Runs the main method of class with the specified arguments.
+    The storm jars and configs in ~/.storm are put on the classpath.
+    The process is configured so that StormSubmitter
+    
(http://storm.incubator.apache.org/apidocs/backtype/storm/StormSubmitter.html)
+    will upload the jar at topology-jar-path when the topology is submitted.
+    """
+    exec_storm_class(
+        klass,
+        jvmtype="-client",
+        extrajars=[jarfile, USER_CONF_DIR, STORM_BIN_DIR],
+        args=args,
+        jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + jarfile])
+
+def kill(*args):
+    """Syntax: [storm kill topology-name [-w wait-time-secs]]
+
+    Kills the topology with the name topology-name. Storm will
+    first deactivate the topology's spouts for the duration of
+    the topology's message timeout to allow all messages currently
+    being processed to finish processing. Storm will then shutdown
+    the workers and clean up their state. You can override the length
+    of time Storm waits between deactivation and shutdown with the -w flag.
+    """
+    exec_storm_class(
+        "backtype.storm.command.kill_topology",
+        args=args,
+        jvmtype="-client",
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+
+
+def upload_credentials(*args):
+    """Syntax: [storm upload_credentials topology-name [credkey credvalue]*]
+
+    Uploads a new set of credentials to a running topology
+    """
+    exec_storm_class(
+        "backtype.storm.command.upload_credentials",
+        args=args,
+        jvmtype="-client",
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+
+def activate(*args):
+    """Syntax: [storm activate topology-name]
+
+    Activates the specified topology's spouts.
+    """
+    exec_storm_class(
+        "backtype.storm.command.activate",
+        args=args,
+        jvmtype="-client",
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+
+def listtopos(*args):
+    """Syntax: [storm list]
+
+    List the running topologies and their statuses.
+    """
+    exec_storm_class(
+        "backtype.storm.command.list",
+        args=args,
+        jvmtype="-client",
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+
+def deactivate(*args):
+    """Syntax: [storm deactivate topology-name]
+
+    Deactivates the specified topology's spouts.
+    """
+    exec_storm_class(
+        "backtype.storm.command.deactivate",
+        args=args,
+        jvmtype="-client",
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+
+def rebalance(*args):
+    """Syntax: [storm rebalance topology-name [-w wait-time-secs] [-n 
new-num-workers] [-e component=parallelism]*]
+
+    Sometimes you may wish to spread out where the workers for a topology
+    are running. For example, let's say you have a 10 node cluster running
+    4 workers per node, and then let's say you add another 10 nodes to
+    the cluster. You may wish to have Storm spread out the workers for the
+    running topology so that each node runs 2 workers. One way to do this
+    is to kill the topology and resubmit it, but Storm provides a "rebalance"
+    command that provides an easier way to do this.
+
+    Rebalance will first deactivate the topology for the duration of the
+    message timeout (overridable with the -w flag) and then redistribute
+    the workers evenly around the cluster. The topology will then return to
+    its previous state of activation (so a deactivated topology will still
+    be deactivated and an activated topology will go back to being activated).
+
+    The rebalance command can also be used to change the parallelism of a 
running topology.
+    Use the -n and -e switches to change the number of workers or number of 
executors of a component
+    respectively.
+    """
+    exec_storm_class(
+        "backtype.storm.command.rebalance",
+        args=args,
+        jvmtype="-client",
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+
+def shell(resourcesdir, command, *args):
+    tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar"
+    os.system("jar cf %s %s" % (tmpjarpath, resourcesdir))
+    runnerargs = [tmpjarpath, command]
+    runnerargs.extend(args)
+    exec_storm_class(
+        "backtype.storm.command.shell_submission",
+        args=runnerargs,
+        jvmtype="-client",
+        extrajars=[USER_CONF_DIR],
+        fork=True)
+    os.system("rm " + tmpjarpath)
+
+def repl():
+    """Syntax: [storm repl]
+
+    Opens up a Clojure REPL with the storm jars and configuration
+    on the classpath. Useful for debugging.
+    """
+    cppaths = [CLUSTER_CONF_DIR]
+    exec_storm_class("clojure.main", jvmtype="-client", extrajars=cppaths)
+
+def get_logback_conf_dir():
+    cppaths = [CLUSTER_CONF_DIR]
+    storm_logback_conf_dir = confvalue("storm.logback.conf.dir", cppaths)
+    if(storm_logback_conf_dir == None or storm_logback_conf_dir == "nil"):
+        storm_logback_conf_dir = STORM_LOGBACK_CONF_DIR
+    return storm_logback_conf_dir
+
+def nimbus(klass="backtype.storm.daemon.nimbus"):
+    """Syntax: [storm nimbus]
+
+    Launches the nimbus daemon. This command should be run under
+    supervision with a tool like daemontools or monit.
+
+    See Setting up a Storm cluster for more information.
+    
(http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster)
+    """
+    cppaths = [CLUSTER_CONF_DIR]
+    jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [
+        "-Dlogfile.name=nimbus.log",
+        "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), 
"cluster.xml"),
+    ]
+    exec_storm_class(
+        klass,
+        jvmtype="-server",
+        extrajars=cppaths,
+        jvmopts=jvmopts)
+
+def supervisor(klass="backtype.storm.daemon.supervisor"):
+    """Syntax: [storm supervisor]
+
+    Launches the supervisor daemon. This command should be run
+    under supervision with a tool like daemontools or monit.
+
+    See Setting up a Storm cluster for more information.
+    
(http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster)
+    """
+    cppaths = [CLUSTER_CONF_DIR]
+    jvmopts = parse_args(confvalue("supervisor.childopts", cppaths)) + [
+        "-Dlogfile.name=supervisor.log",
+        "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), 
"cluster.xml"),
+    ]
+    exec_storm_class(
+        klass,
+        jvmtype="-server",
+        extrajars=cppaths,
+        jvmopts=jvmopts)
+
+def ui():
+    """Syntax: [storm ui]
+
+    Launches the UI daemon. The UI provides a web interface for a Storm
+    cluster and shows detailed stats about running topologies. This command
+    should be run under supervision with a tool like daemontools or monit.
+
+    See Setting up a Storm cluster for more information.
+    
(http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster)
+    """
+    cppaths = [CLUSTER_CONF_DIR]
+    jvmopts = parse_args(confvalue("ui.childopts", cppaths)) + [
+        "-Dlogfile.name=ui.log",
+        "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), 
"cluster.xml")
+    ]
+    exec_storm_class(
+        "backtype.storm.ui.core",
+        jvmtype="-server",
+        jvmopts=jvmopts,
+        extrajars=[STORM_DIR, CLUSTER_CONF_DIR])
+
+def logviewer():
+    """Syntax: [storm logviewer]
+
+    Launches the log viewer daemon. It provides a web interface for viewing
+    storm log files. This command should be run under supervision with a
+    tool like daemontools or monit.
+
+    See Setting up a Storm cluster for more information.
+    
(http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster)
+    """
+    cppaths = [CLUSTER_CONF_DIR]
+    jvmopts = parse_args(confvalue("logviewer.childopts", cppaths)) + [
+        "-Dlogfile.name=logviewer.log",
+        "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), 
"cluster.xml")
+    ]
+    exec_storm_class(
+        "backtype.storm.daemon.logviewer",
+        jvmtype="-server",
+        jvmopts=jvmopts,
+        extrajars=[STORM_DIR, CLUSTER_CONF_DIR])
+
+def drpc():
+    """Syntax: [storm drpc]
+
+    Launches a DRPC daemon. This command should be run under supervision
+    with a tool like daemontools or monit.
+
+    See Distributed RPC for more information.
+    (http://storm.incubator.apache.org/documentation/Distributed-RPC)
+    """
+    cppaths = [CLUSTER_CONF_DIR]
+    jvmopts = parse_args(confvalue("drpc.childopts", cppaths)) + [
+        "-Dlogfile.name=drpc.log",
+        "-Dlogback.configurationFile=" + os.path.join(get_logback_conf_dir(), 
"cluster.xml")
+    ]
+    exec_storm_class(
+        "backtype.storm.daemon.drpc",
+        jvmtype="-server",
+        jvmopts=jvmopts,
+        extrajars=[CLUSTER_CONF_DIR])
+
+def dev_zookeeper():
+    """Syntax: [storm dev-zookeeper]
+
+    Launches a fresh Zookeeper server using "dev.zookeeper.path" as its local 
dir and
+    "storm.zookeeper.port" as its port. This is only intended for 
development/testing, the
+    Zookeeper instance launched is not configured to be used in production.
+    """
+    cppaths = [CLUSTER_CONF_DIR]
+    exec_storm_class(
+        "backtype.storm.command.dev_zookeeper",
+        jvmtype="-server",
+        extrajars=[CLUSTER_CONF_DIR])
+
+def version():
+  """Syntax: [storm version]
+
+  Prints the version number of this Storm release.
+  """
+  cppaths = [CLUSTER_CONF_DIR]
+  exec_storm_class(
+       "backtype.storm.utils.VersionInfo",
+       jvmtype="-client",
+       extrajars=[CLUSTER_CONF_DIR])
+
+def print_classpath():
+    """Syntax: [storm classpath]
+
+    Prints the classpath used by the storm client when running commands.
+    """
+    print(get_classpath([]))
+
+def monitor(*args):
+    """Syntax: [storm monitor topology-name [-i interval-secs] [-m 
component-id] [-s stream-id] [-w [emitted | transferred]]]
+
+    Monitor given topology's throughput interactively.
+    One can specify poll-interval, component-id, stream-id, watch-item[emitted 
| transferred]
+    By default,
+        poll-interval is 4 seconds;
+        all component-ids will be list;
+        stream-id is 'default';
+        watch-item is 'emitted';
+    """
+    exec_storm_class(
+        "backtype.storm.command.monitor",
+        args=args,
+        jvmtype="-client",
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+
+
+def print_commands():
+    """Print all client commands and link to documentation"""
+    print("Commands:\n\t" +  "\n\t".join(sorted(COMMANDS.keys())))
+    print("\nHelp: \n\thelp \n\thelp <command>")
+    print("\nDocumentation for the storm client can be found at 
http://storm.incubator.apache.org/documentation/Command-line-client.html\n";)
+    print("Configs can be overridden using one or more -c flags, e.g. \"storm 
list -c nimbus.host=nimbus.mycompany.com\"\n")
+
+def print_usage(command=None):
+    """Print one help message or list of available commands"""
+    if command != None:
+        if command in COMMANDS:
+            print(COMMANDS[command].__doc__ or
+                  "No documentation provided for <%s>" % command)
+        else:
+           print("<%s> is not a valid command" % command)
+    else:
+        print_commands()
+
+def unknown_command(*args):
+    print("Unknown command: [storm %s]" % ' '.join(sys.argv[1:]))
+    print_usage()
+    sys.exit(254)
+
+COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": 
ui, "logviewer": logviewer,
+            "drpc": drpc, "supervisor": supervisor, "localconfvalue": 
print_localconfvalue,
+            "remoteconfvalue": print_remoteconfvalue, "repl": repl, 
"classpath": print_classpath,
+            "activate": activate, "deactivate": deactivate, "rebalance": 
rebalance, "help": print_usage,
+            "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": 
version, "monitor": monitor,
+            "upload-credentials": upload_credentials}
+
+def parse_config(config_list):
+    global CONFIG_OPTS
+    if len(config_list) > 0:
+        for config in config_list:
+            CONFIG_OPTS.append(config)
+
+def parse_config_opts(args):
+  curr = args[:]
+  curr.reverse()
+  config_list = []
+  args_list = []
+
+  while len(curr) > 0:
+    token = curr.pop()
+    if token == "-c":
+      config_list.append(curr.pop())
+    elif token == "--config":
+      global CONFFILE
+      CONFFILE = curr.pop()
+    else:
+      args_list.append(token)
+
+  return config_list, args_list
+
+def main():
+    if len(sys.argv) <= 1:
+        print_usage()
+        sys.exit(-1)
+    global CONFIG_OPTS
+    config_list, args = parse_config_opts(sys.argv[1:])
+    parse_config(config_list)
+    COMMAND = args[0]
+    ARGS = args[1:]
+    (COMMANDS.get(COMMAND, unknown_command))(*ARGS)
+
+if __name__ == "__main__":
+    main()

http://git-wip-us.apache.org/repos/asf/storm/blob/a4a8f7d2/conf/storm-env.sh
----------------------------------------------------------------------
diff --git a/conf/storm-env.sh b/conf/storm-env.sh
new file mode 100644
index 0000000..f00525c
--- /dev/null
+++ b/conf/storm-env.sh
@@ -0,0 +1,26 @@
+#!/bin/bash
+#
+# Copyright 2014 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Set Storm specific environment variables here.
+
+# The java implementation to use.
+export JAVA_HOME=${JAVA_HOME}
+
+# export STORM_CONF_DIR=""

Reply via email to