implement/document environment variable substitution; document command line switch pass-through;
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/44d5b5be Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/44d5b5be Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/44d5b5be Branch: refs/heads/master Commit: 44d5b5be2dc0e2334fa1e4bd5934eb1f57985084 Parents: d8d6cfa Author: P. Taylor Goetz <ptgo...@gmail.com> Authored: Tue Mar 31 00:16:37 2015 -0400 Committer: P. Taylor Goetz <ptgo...@gmail.com> Committed: Tue Mar 31 00:16:37 2015 -0400 ---------------------------------------------------------------------- README.md | 35 ++++++++++++++-- .../main/java/org/apache/storm/flux/Flux.java | 15 +++++-- .../apache/storm/flux/parser/FluxParser.java | 35 +++++++++++----- .../java/org/apache/storm/flux/TCKTest.java | 43 ++++++++++++-------- .../resources/configs/substitution-test.yaml | 2 + 5 files changed, 97 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/44d5b5be/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index ebf9d5a..71403e0 100644 --- a/README.md +++ b/README.md @@ -149,9 +149,15 @@ usage: storm jar <my_topology_uber_jar.jar> org.apache.storm.flux.Flux -d,--dry-run Do not run or deploy the topology. Just build, validate, and print information about the topology. - -f,--filter <file> Use the specified file as a source of - properties, and perform variable - substitution. + -e,--env-filter Perform environment variable substitution. + Replace keysidentified with `${ENV-[NAME]}` + will be replaced with the corresponding + `NAME` environment value + -f,--filter <file> Perform property substitution. Use the + specified file as a source of properties, + and replace keys identified with {$[property + name]} with the value defined in the + properties file. -i,--inactive Deploy the topology, but do not activate it. -l,--local Run the topology in local mode. -n,--no-splash Suppress the printing of the splash screen. @@ -167,6 +173,16 @@ usage: storm jar <my_topology_uber_jar.jar> org.apache.storm.flux.Flux instead of the in-process ZooKeeper. ``` +**NOTE:** Flux tries to avoid command line switch collision with the `storm` command, and allows any other command line +switches to pass through to the `storm` command. + +For example, you can use the `storm` command switch `-c` to override a topology configuration property. The following +example command will run Flux and override the `nimus.host` configuration: + +```bash +storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote my_config.yaml -c nimbus.host=localhost +``` + ### Sample output ``` âââââââââââ âââ ââââââ âââ @@ -281,6 +297,13 @@ You would then be able to reference those properties by key in your `.yaml` file In this case, Flux would replace `${kafka.zookeeper.hosts}` with `localhost:2181` before parsing the YAML contents. +### Environment Variable Substitution/Filtering +Flux also allows environment variable substitution. For example, if an environment variable named `ZK_HOSTS` if defined, +you can reference it in a Flux YAML file with the following syntax: + +``` +${ENV-ZK_HOSTS} +``` ## Components Components are essentially named object instances that are made available as configuration options for spouts and @@ -772,6 +795,12 @@ topologySource: methodName: "getTopologyWithDifferentMethodName" ``` +## Author +P. Taylor Goetz + +## Contributors + + ## Contributing Contributions in any form are more than welcome. http://git-wip-us.apache.org/repos/asf/storm/blob/44d5b5be/flux-core/src/main/java/org/apache/storm/flux/Flux.java ---------------------------------------------------------------------- diff --git a/flux-core/src/main/java/org/apache/storm/flux/Flux.java b/flux-core/src/main/java/org/apache/storm/flux/Flux.java index dcd3953..2c2105c 100644 --- a/flux-core/src/main/java/org/apache/storm/flux/Flux.java +++ b/flux-core/src/main/java/org/apache/storm/flux/Flux.java @@ -55,6 +55,7 @@ public class Flux { private static final String OPTION_INACTIVE = "inactive"; private static final String OPTION_ZOOKEEPER = "zookeeper"; private static final String OPTION_FILTER = "filter"; + private static final String OPTION_ENV_FILTER = "env-filter"; public static void main(String[] args) throws Exception { Options options = new Options(); @@ -80,8 +81,12 @@ public class Flux { options.addOption(option(1, "z", OPTION_ZOOKEEPER, "host:port", "When running in local mode, use the ZooKeeper at the " + "specified <host>:<port> instead of the in-process ZooKeeper.")); - options.addOption(option(1, "f", OPTION_FILTER, "file", "Use the specified file as a source of properties, and " + - "perform variable substitution.")); + options.addOption(option(1, "f", OPTION_FILTER, "file", "Perform property substitution. Use the specified file " + + "as a source of properties, and replace keys identified with {$[property name]} with the value defined " + + "in the properties file.")); + + options.addOption(option(0, "e", OPTION_ENV_FILTER, "Perform environment variable substitution. Replace keys" + + "identified with `${ENV-[NAME]}` will be replaced with the corresponding `NAME` environment value")); CommandLineParser parser = new BasicParser(); CommandLine cmd = parser.parse(options, args); @@ -129,13 +134,15 @@ public class Flux { filterProps = cmd.getOptionValue(OPTION_FILTER); } + + boolean envFilter = cmd.hasOption(OPTION_ENV_FILTER); if(cmd.hasOption(OPTION_RESOURCE)){ printf("Parsing classpath resource: %s", filePath); - topologyDef = FluxParser.parseResource(filePath, dumpYaml, true, filterProps); + topologyDef = FluxParser.parseResource(filePath, dumpYaml, true, filterProps, envFilter); } else { printf("Parsing file: %s", new File(filePath).getAbsolutePath()); - topologyDef = FluxParser.parseFile(filePath, dumpYaml, true, filterProps); + topologyDef = FluxParser.parseFile(filePath, dumpYaml, true, filterProps, envFilter); } http://git-wip-us.apache.org/repos/asf/storm/blob/44d5b5be/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java ---------------------------------------------------------------------- diff --git a/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java b/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java index 109330d..78c52d5 100644 --- a/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java +++ b/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java @@ -43,46 +43,49 @@ public class FluxParser { // TODO refactor input stream processing (see parseResource() method). public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean processIncludes, - String propertiesFile) throws IOException { + String propertiesFile, boolean envSub) throws IOException { Yaml yaml = yaml(); FileInputStream in = new FileInputStream(inputFile); // TODO process properties, etc. - TopologyDef topology = loadYaml(yaml, in, propertiesFile); + TopologyDef topology = loadYaml(yaml, in, propertiesFile, envSub); in.close(); if(dumpYaml){ dumpYaml(topology, yaml); } if(processIncludes) { - return processIncludes(yaml, topology, propertiesFile); + return processIncludes(yaml, topology, propertiesFile, envSub); } else { return topology; } } public static TopologyDef parseResource(String resource, boolean dumpYaml, boolean processIncludes, - String propertiesFile) throws IOException { + String propertiesFile, boolean envSub) throws IOException { Yaml yaml = yaml(); InputStream in = FluxParser.class.getResourceAsStream(resource); - TopologyDef topology = loadYaml(yaml, in, propertiesFile); + TopologyDef topology = loadYaml(yaml, in, propertiesFile, envSub); in.close(); if(dumpYaml){ dumpYaml(topology, yaml); } if(processIncludes) { - return processIncludes(yaml, topology, propertiesFile); + return processIncludes(yaml, topology, propertiesFile, envSub); } else { return topology; } } - private static TopologyDef loadYaml(Yaml yaml, InputStream in, String propsFile) throws IOException { + private static TopologyDef loadYaml(Yaml yaml, InputStream in, String propsFile, boolean envSubstitution) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); LOG.info("loading YAML from input stream..."); int b = -1; while((b = in.read()) != -1){ bos.write(b); } + + // TODO substitution implementation is not exactly efficient or kind to memory... String str = bos.toString(); + // properties file substitution if(propsFile != null){ LOG.info("Performing property substitution."); InputStream propsIn = new FileInputStream(propsFile); @@ -94,6 +97,17 @@ public class FluxParser { } else { LOG.info("Not performing property substitution."); } + + // environment variable substitution + if(envSubstitution){ + LOG.info("Performing environment variable substitution..."); + Map<String, String> envs = System.getenv(); + for(String key : envs.keySet()){ + str = str.replace("${ENV-" + key + "}", envs.get(key)); + } + } else { + LOG.info("Not performing environment variable substitution."); + } return (TopologyDef)yaml.load(str); } @@ -120,17 +134,18 @@ public class FluxParser { * @param topologyDef the topology definition containing (possibly zero) includes * @return The TopologyDef with includes resolved. */ - private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, String propsFile) throws IOException { + private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, String propsFile, boolean envSub) + throws IOException { //TODO support multiple levels of includes if(topologyDef.getIncludes() != null) { for (IncludeDef include : topologyDef.getIncludes()){ TopologyDef includeTopologyDef = null; if (include.isResource()) { LOG.info("Loading includes from resource: {}", include.getFile()); - includeTopologyDef = parseResource(include.getFile(), true, false, propsFile); + includeTopologyDef = parseResource(include.getFile(), true, false, propsFile, envSub); } else { LOG.info("Loading includes from file: {}", include.getFile()); - includeTopologyDef = parseFile(include.getFile(), true, false, propsFile); + includeTopologyDef = parseFile(include.getFile(), true, false, propsFile, envSub); } // if overrides are disabled, we won't replace anything that already exists http://git-wip-us.apache.org/repos/asf/storm/blob/44d5b5be/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java ---------------------------------------------------------------------- diff --git a/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java index f6076cc..6580ef7 100644 --- a/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java +++ b/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java @@ -32,7 +32,7 @@ import static org.junit.Assert.*; public class TCKTest { @Test public void testTCK() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/tck.yaml", false, true, null); + TopologyDef topologyDef = FluxParser.parseResource("/configs/tck.yaml", false, true, null, false); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); StormTopology topology = FluxBuilder.buildTopology(context); @@ -42,7 +42,7 @@ public class TCKTest { @Test public void testShellComponents() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/shell_test.yaml", false, true, null); + TopologyDef topologyDef = FluxParser.parseResource("/configs/shell_test.yaml", false, true, null, false); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); StormTopology topology = FluxBuilder.buildTopology(context); @@ -52,7 +52,7 @@ public class TCKTest { @Test public void testKafkaSpoutConfig() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null); + TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null, false); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); StormTopology topology = FluxBuilder.buildTopology(context); @@ -62,7 +62,7 @@ public class TCKTest { @Test public void testLoadFromResource() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null); + TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null, false); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); StormTopology topology = FluxBuilder.buildTopology(context); @@ -73,7 +73,7 @@ public class TCKTest { @Test public void testHdfs() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/hdfs_test.yaml", false, true, null); + TopologyDef topologyDef = FluxParser.parseResource("/configs/hdfs_test.yaml", false, true, null, false); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); StormTopology topology = FluxBuilder.buildTopology(context); @@ -83,7 +83,7 @@ public class TCKTest { @Test public void testIncludes() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/include_test.yaml", false, true, null); + TopologyDef topologyDef = FluxParser.parseResource("/configs/include_test.yaml", false, true, null, false); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); StormTopology topology = FluxBuilder.buildTopology(context); @@ -96,7 +96,7 @@ public class TCKTest { @Test public void testTopologySource() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology.yaml", false, true, null); + TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology.yaml", false, true, null, false); assertTrue(topologyDef.validate()); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); @@ -107,7 +107,7 @@ public class TCKTest { @Test public void testTopologySourceWithReflection() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null); + TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null, false); assertTrue(topologyDef.validate()); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); @@ -118,7 +118,7 @@ public class TCKTest { @Test public void testTopologySourceWithConfigParam() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection-config.yaml", false, true, null); + TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection-config.yaml", false, true, null, false); assertTrue(topologyDef.validate()); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); @@ -129,7 +129,7 @@ public class TCKTest { @Test public void testTopologySourceWithMethodName() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-method-override.yaml", false, true, null); + TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-method-override.yaml", false, true, null, false); assertTrue(topologyDef.validate()); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); @@ -141,7 +141,7 @@ public class TCKTest { @Test public void testTridentTopologySource() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-trident.yaml", false, true, null); + TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-trident.yaml", false, true, null, false); assertTrue(topologyDef.validate()); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); @@ -152,7 +152,7 @@ public class TCKTest { @Test(expected = IllegalArgumentException.class) public void testInvalidTopologySource() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/invalid-existing-topology.yaml", false, true, null); + TopologyDef topologyDef = FluxParser.parseResource("/configs/invalid-existing-topology.yaml", false, true, null, false); assertFalse("Topology config is invalid.", topologyDef.validate()); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); @@ -162,7 +162,7 @@ public class TCKTest { @Test public void testTopologySourceWithGetMethodName() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null); + TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null, false); assertTrue(topologyDef.validate()); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); @@ -173,7 +173,7 @@ public class TCKTest { @Test public void testTopologySourceWithConfigMethods() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/config-methods-test.yaml", false, true, null); + TopologyDef topologyDef = FluxParser.parseResource("/configs/config-methods-test.yaml", false, true, null, false); assertTrue(topologyDef.validate()); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); @@ -190,7 +190,7 @@ public class TCKTest { @Test public void testVariableSubstitution() throws Exception { - TopologyDef topologyDef = FluxParser.parseResource("/configs/substitution-test.yaml", false, true, "src/test/resources/configs/test.properties"); + TopologyDef topologyDef = FluxParser.parseResource("/configs/substitution-test.yaml", false, true, "src/test/resources/configs/test.properties", true); assertTrue(topologyDef.validate()); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); @@ -198,6 +198,17 @@ public class TCKTest { assertNotNull(topology); topology.validate(); - assertTrue(context.getTopologyDef().getName().equals("substitution-topology")); + // test basic substitution + assertEquals("Property not replaced.", + "substitution-topology", + context.getTopologyDef().getName()); + + // test environment variable substitution + // $PATH should be defined on most systems + String envPath = System.getenv().get("PATH"); + assertEquals("ENV variable not replaced.", + envPath, + context.getTopologyDef().getConfig().get("test.env.value")); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/44d5b5be/flux-core/src/test/resources/configs/substitution-test.yaml ---------------------------------------------------------------------- diff --git a/flux-core/src/test/resources/configs/substitution-test.yaml b/flux-core/src/test/resources/configs/substitution-test.yaml index cbfeea4..13f1960 100644 --- a/flux-core/src/test/resources/configs/substitution-test.yaml +++ b/flux-core/src/test/resources/configs/substitution-test.yaml @@ -41,6 +41,8 @@ name: "${topology.name}" # config: topology.workers: 1 + # test environent variable substitution + test.env.value: ${ENV-PATH} # ... # spout definitions