Repository: metron Updated Branches: refs/heads/master 8ef18d31a -> 38eebd59c
METRON-1134: Allow parser command line options to be specified in the zookeeper parser config. closes apache/incubator-metron#717 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/38eebd59 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/38eebd59 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/38eebd59 Branch: refs/heads/master Commit: 38eebd59ce2884ead7f3797ff98f7308409e05dd Parents: 8ef18d3 Author: cstella <ceste...@gmail.com> Authored: Thu Aug 31 15:46:29 2017 -0400 Committer: cstella <ceste...@gmail.com> Committed: Thu Aug 31 15:46:29 2017 -0400 ---------------------------------------------------------------------- ...orParserConfigControllerIntegrationTest.java | 18 +- .../configuration/SensorParserConfig.java | 208 ++++++++- .../management/ConfigurationFunctionsTest.java | 10 +- metron-platform/metron-parsers/README.md | 11 + .../parsers/topology/ParserTopologyBuilder.java | 87 ++-- .../parsers/topology/ParserTopologyCLI.java | 218 +++++++--- .../parsers/topology/config/ValueSupplier.java | 30 ++ .../components/ParserTopologyComponent.java | 30 +- .../parsers/topology/ParserTopologyCLITest.java | 425 ++++++++++++++++++- 9 files changed, 934 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/38eebd59/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java index f5ea23d..66771eb 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java @@ -19,6 +19,7 @@ package org.apache.metron.rest.controller; import org.adrianwalker.multilinestring.Multiline; import org.apache.commons.io.FileUtils; +import org.apache.metron.common.configuration.SensorParserConfig; import org.apache.metron.rest.MetronRestConstants; import org.apache.metron.rest.service.GrokService; import org.apache.metron.rest.service.SensorParserConfigService; @@ -37,6 +38,7 @@ import org.springframework.web.context.WebApplicationContext; import java.io.File; import java.io.IOException; +import java.lang.reflect.Method; import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; import static org.hamcrest.Matchers.hasSize; @@ -196,11 +198,17 @@ public class SensorParserConfigControllerIntegrationTest { cleanFileSystem(); this.sensorParserConfigService.delete("broTest"); this.sensorParserConfigService.delete("squidTest"); - + Method[] method = SensorParserConfig.class.getMethods(); + int numFields = 0; + for(Method m : method) { + if(m.getName().startsWith("set")) { + numFields++; + } + } this.mockMvc.perform(post(sensorParserConfigUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(squidJson)) .andExpect(status().isCreated()) .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) - .andExpect(jsonPath("$.*", hasSize(10))) + .andExpect(jsonPath("$.*", hasSize(numFields))) .andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.GrokParser")) .andExpect(jsonPath("$.sensorTopic").value("squidTest")) .andExpect(jsonPath("$.parserConfig.grokPath").value("target/patterns/squidTest")) @@ -215,7 +223,7 @@ public class SensorParserConfigControllerIntegrationTest { this.mockMvc.perform(get(sensorParserConfigUrl + "/squidTest").with(httpBasic(user,password))) .andExpect(status().isOk()) .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) - .andExpect(jsonPath("$.*", hasSize(10))) + .andExpect(jsonPath("$.*", hasSize(numFields))) .andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.GrokParser")) .andExpect(jsonPath("$.sensorTopic").value("squidTest")) .andExpect(jsonPath("$.parserConfig.grokPath").value("target/patterns/squidTest")) @@ -244,7 +252,7 @@ public class SensorParserConfigControllerIntegrationTest { this.mockMvc.perform(post(sensorParserConfigUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson)) .andExpect(status().isCreated()) .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) - .andExpect(jsonPath("$.*", hasSize(10))) + .andExpect(jsonPath("$.*", hasSize(numFields))) .andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.bro.BasicBroParser")) .andExpect(jsonPath("$.sensorTopic").value("broTest")) .andExpect(jsonPath("$.readMetadata").value("true")) @@ -254,7 +262,7 @@ public class SensorParserConfigControllerIntegrationTest { this.mockMvc.perform(post(sensorParserConfigUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson)) .andExpect(status().isOk()) .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) - .andExpect(jsonPath("$.*", hasSize(10))) + .andExpect(jsonPath("$.*", hasSize(numFields))) .andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.bro.BasicBroParser")) .andExpect(jsonPath("$.sensorTopic").value("broTest")) .andExpect(jsonPath("$.readMetadata").value("true")) http://git-wip-us.apache.org/repos/asf/metron/blob/38eebd59/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java index f08e9c4..2d0ccd8 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java @@ -37,7 +37,155 @@ public class SensorParserConfig implements Serializable { private String invalidWriterClassName; private Boolean readMetadata = false; private Boolean mergeMetadata = false; + private Integer numWorkers = null; + private Integer numAckers= null; + private Integer spoutParallelism = 1; + private Integer spoutNumTasks = 1; + private Integer parserParallelism = 1; + private Integer parserNumTasks = 1; + private Integer errorWriterParallelism = 1; + private Integer errorWriterNumTasks = 1; + private Map<String, Object> spoutConfig = new HashMap<>(); + private String securityProtocol = null; + private Map<String, Object> stormConfig = new HashMap<>(); + + /** + * Return the number of workers for the topology. This property will be used for the parser unless overridden on the CLI. + * @return + */ + public Integer getNumWorkers() { + return numWorkers; + } + + public void setNumWorkers(Integer numWorkers) { + this.numWorkers = numWorkers; + } + + /** + * Return the number of ackers for the topology. This property will be used for the parser unless overridden on the CLI. + * @return + */ + public Integer getNumAckers() { + return numAckers; + } + + public void setNumAckers(Integer numAckers) { + this.numAckers = numAckers; + } + + /** + * Return the spout parallelism. This property will be used for the parser unless overridden on the CLI. + * @return + */ + public Integer getSpoutParallelism() { + return spoutParallelism; + } + + public void setSpoutParallelism(Integer spoutParallelism) { + this.spoutParallelism = spoutParallelism; + } + + /** + * Return the spout num tasks. This property will be used for the parser unless overridden on the CLI. + * @return + */ + public Integer getSpoutNumTasks() { + return spoutNumTasks; + } + + public void setSpoutNumTasks(Integer spoutNumTasks) { + this.spoutNumTasks = spoutNumTasks; + } + + /** + * Return the parser parallelism. This property will be used for the parser unless overridden on the CLI. + * @return + */ + public Integer getParserParallelism() { + return parserParallelism; + } + + public void setParserParallelism(Integer parserParallelism) { + this.parserParallelism = parserParallelism; + } + + /** + * Return the parser number of tasks. This property will be used for the parser unless overridden on the CLI. + * @return + */ + public Integer getParserNumTasks() { + return parserNumTasks; + } + + public void setParserNumTasks(Integer parserNumTasks) { + this.parserNumTasks = parserNumTasks; + } + + /** + * Return the error writer bolt parallelism. This property will be used for the parser unless overridden on the CLI. + * @return + */ + public Integer getErrorWriterParallelism() { + return errorWriterParallelism; + } + + public void setErrorWriterParallelism(Integer errorWriterParallelism) { + this.errorWriterParallelism = errorWriterParallelism; + } + + /** + * Return the error writer bolt number of tasks. This property will be used for the parser unless overridden on the CLI. + * @return + */ + public Integer getErrorWriterNumTasks() { + return errorWriterNumTasks; + } + + public void setErrorWriterNumTasks(Integer errorWriterNumTasks) { + this.errorWriterNumTasks = errorWriterNumTasks; + } + + /** + * Return the spout config. This includes kafka properties. This property will be used for the parser unless overridden on the CLI. + * @return + */ + public Map<String, Object> getSpoutConfig() { + return spoutConfig; + } + public void setSpoutConfig(Map<String, Object> spoutConfig) { + this.spoutConfig = spoutConfig; + } + + /** + * Return security protocol to use. This property will be used for the parser unless overridden on the CLI. + * The order of precedence is CLI > spout config > config in the sensor parser config. + * @return + */ + public String getSecurityProtocol() { + return securityProtocol; + } + + public void setSecurityProtocol(String securityProtocol) { + this.securityProtocol = securityProtocol; + } + + /** + * Return Storm topologyconfig. This property will be used for the parser unless overridden on the CLI. + * @return + */ + public Map<String, Object> getStormConfig() { + return stormConfig; + } + + public void setStormConfig(Map<String, Object> stormConfig) { + this.stormConfig = stormConfig; + } + + /** + * Return whether or not to merge metadata sent into the message. If true, then metadata become proper fields. + * @return + */ public Boolean getMergeMetadata() { return mergeMetadata; } @@ -46,6 +194,10 @@ public class SensorParserConfig implements Serializable { this.mergeMetadata = mergeMetadata; } + /** + * Return whether or not to read metadata at all. + * @return + */ public Boolean getReadMetadata() { return readMetadata; } @@ -145,10 +297,21 @@ public class SensorParserConfig implements Serializable { ", writerClassName='" + writerClassName + '\'' + ", errorWriterClassName='" + errorWriterClassName + '\'' + ", invalidWriterClassName='" + invalidWriterClassName + '\'' + - ", parserConfig=" + parserConfig + - ", fieldTransformations=" + fieldTransformations + ", readMetadata=" + readMetadata + ", mergeMetadata=" + mergeMetadata + + ", numWorkers=" + numWorkers + + ", numAckers=" + numAckers + + ", spoutParallelism=" + spoutParallelism + + ", spoutNumTasks=" + spoutNumTasks + + ", parserParallelism=" + parserParallelism + + ", parserNumTasks=" + parserNumTasks + + ", errorWriterParallelism=" + errorWriterParallelism + + ", errorWriterNumTasks=" + errorWriterNumTasks + + ", spoutConfig=" + spoutConfig + + ", securityProtocol='" + securityProtocol + '\'' + + ", stormConfig=" + stormConfig + + ", parserConfig=" + parserConfig + + ", fieldTransformations=" + fieldTransformations + '}'; } @@ -171,12 +334,34 @@ public class SensorParserConfig implements Serializable { return false; if (getInvalidWriterClassName() != null ? !getInvalidWriterClassName().equals(that.getInvalidWriterClassName()) : that.getInvalidWriterClassName() != null) return false; - if (getParserConfig() != null ? !getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != null) - return false; if (getReadMetadata() != null ? !getReadMetadata().equals(that.getReadMetadata()) : that.getReadMetadata() != null) return false; if (getMergeMetadata() != null ? !getMergeMetadata().equals(that.getMergeMetadata()) : that.getMergeMetadata() != null) return false; + if (getNumWorkers() != null ? !getNumWorkers().equals(that.getNumWorkers()) : that.getNumWorkers() != null) + return false; + if (getNumAckers() != null ? !getNumAckers().equals(that.getNumAckers()) : that.getNumAckers() != null) + return false; + if (getSpoutParallelism() != null ? !getSpoutParallelism().equals(that.getSpoutParallelism()) : that.getSpoutParallelism() != null) + return false; + if (getSpoutNumTasks() != null ? !getSpoutNumTasks().equals(that.getSpoutNumTasks()) : that.getSpoutNumTasks() != null) + return false; + if (getParserParallelism() != null ? !getParserParallelism().equals(that.getParserParallelism()) : that.getParserParallelism() != null) + return false; + if (getParserNumTasks() != null ? !getParserNumTasks().equals(that.getParserNumTasks()) : that.getParserNumTasks() != null) + return false; + if (getErrorWriterParallelism() != null ? !getErrorWriterParallelism().equals(that.getErrorWriterParallelism()) : that.getErrorWriterParallelism() != null) + return false; + if (getErrorWriterNumTasks() != null ? !getErrorWriterNumTasks().equals(that.getErrorWriterNumTasks()) : that.getErrorWriterNumTasks() != null) + return false; + if (getSpoutConfig() != null ? !getSpoutConfig().equals(that.getSpoutConfig()) : that.getSpoutConfig() != null) + return false; + if (getSecurityProtocol() != null ? !getSecurityProtocol().equals(that.getSecurityProtocol()) : that.getSecurityProtocol() != null) + return false; + if (getStormConfig() != null ? !getStormConfig().equals(that.getStormConfig()) : that.getStormConfig() != null) + return false; + if (getParserConfig() != null ? !getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != null) + return false; return getFieldTransformations() != null ? getFieldTransformations().equals(that.getFieldTransformations()) : that.getFieldTransformations() == null; } @@ -189,10 +374,21 @@ public class SensorParserConfig implements Serializable { result = 31 * result + (getWriterClassName() != null ? getWriterClassName().hashCode() : 0); result = 31 * result + (getErrorWriterClassName() != null ? getErrorWriterClassName().hashCode() : 0); result = 31 * result + (getInvalidWriterClassName() != null ? getInvalidWriterClassName().hashCode() : 0); - result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0); - result = 31 * result + (getFieldTransformations() != null ? getFieldTransformations().hashCode() : 0); result = 31 * result + (getReadMetadata() != null ? getReadMetadata().hashCode() : 0); result = 31 * result + (getMergeMetadata() != null ? getMergeMetadata().hashCode() : 0); + result = 31 * result + (getNumWorkers() != null ? getNumWorkers().hashCode() : 0); + result = 31 * result + (getNumAckers() != null ? getNumAckers().hashCode() : 0); + result = 31 * result + (getSpoutParallelism() != null ? getSpoutParallelism().hashCode() : 0); + result = 31 * result + (getSpoutNumTasks() != null ? getSpoutNumTasks().hashCode() : 0); + result = 31 * result + (getParserParallelism() != null ? getParserParallelism().hashCode() : 0); + result = 31 * result + (getParserNumTasks() != null ? getParserNumTasks().hashCode() : 0); + result = 31 * result + (getErrorWriterParallelism() != null ? getErrorWriterParallelism().hashCode() : 0); + result = 31 * result + (getErrorWriterNumTasks() != null ? getErrorWriterNumTasks().hashCode() : 0); + result = 31 * result + (getSpoutConfig() != null ? getSpoutConfig().hashCode() : 0); + result = 31 * result + (getSecurityProtocol() != null ? getSecurityProtocol().hashCode() : 0); + result = 31 * result + (getStormConfig() != null ? getStormConfig().hashCode() : 0); + result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0); + result = 31 * result + (getFieldTransformations() != null ? getFieldTransformations().hashCode() : 0); return result; } } http://git-wip-us.apache.org/repos/asf/metron/blob/38eebd59/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java index 431ece2..31eeafe 100644 --- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java +++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java @@ -81,7 +81,15 @@ public class ConfigurationFunctionsTest { "parserConfig" : { }, "fieldTransformations" : [ ], "readMetadata":false, - "mergeMetadata":false + "mergeMetadata":false, + "parserParallelism" : 1, + "errorWriterParallelism" : 1, + "spoutNumTasks" : 1, + "stormConfig" : {}, + "errorWriterNumTasks":1, + "spoutConfig":{}, + "parserNumTasks":1, + "spoutParallelism":1 } */ @Multiline http://git-wip-us.apache.org/repos/asf/metron/blob/38eebd59/metron-platform/metron-parsers/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md index 9de4341..141e232 100644 --- a/metron-platform/metron-parsers/README.md +++ b/metron-platform/metron-parsers/README.md @@ -103,6 +103,17 @@ then it is assumed to be a regex and will match any topic matching the pattern ( * `mergeMetadata` : Boolean indicating whether to merge metadata with the message or not (`false` by default). See below for a discussion about metadata. * `parserConfig` : A JSON Map representing the parser implementation specific configuration. * `fieldTransformations` : An array of complex objects representing the transformations to be done on the message generated from the parser before writing out to the kafka topic. +* `spoutParallelism` : The kafka spout parallelism (default to `1`). This can be overridden on the command line. +* `spoutNumTasks` : The number of tasks for the spout (default to `1`). This can be overridden on the command line. +* `parserParallelism` : The parser bolt parallelism (default to `1`). This can be overridden on the command line. +* `parserNumTasks` : The number of tasks for the parser bolt (default to `1`). This can be overridden on the command line. +* `errorWriterParallelism` : The error writer bolt parallelism (default to `1`). This can be overridden on the command line. +* `errorWriterNumTasks` : The number of tasks for the error writer bolt (default to `1`). This can be overridden on the command line. +* `numWorkers` : The number of workers to use in the topology (default is the storm default of `1`). +* `numAckers` : The number of acker executors to use in the topology (default is the storm default of `1`). +* `spoutConfig` : A map representing a custom spout config (this is a map). This can be overridden on the command line. +* `securityProtocol` : The security protocol to use for reading from kafka (this is a string). This can be overridden on the command line and also specified in the spout config via the `security.protocol` key. If both are specified, then they are merged and the CLI will take precedence. +* `stormConfig` : The storm config to use (this is a map). This can be overridden on the command line. If both are specified, they are merged with CLI properties taking precedence. The `fieldTransformations` is a complex object which defines a transformation which can be done to a message. This transformation can http://git-wip-us.apache.org/repos/asf/metron/blob/38eebd59/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java index feac80b..c918703 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java @@ -18,9 +18,11 @@ package org.apache.metron.parsers.topology; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.metron.parsers.topology.config.ValueSupplier; import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder; import org.apache.metron.storm.kafka.flux.SpoutConfiguration; import org.apache.metron.storm.kafka.flux.StormKafkaSpout; +import org.apache.storm.Config; import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.topology.TopologyBuilder; @@ -48,39 +50,70 @@ import java.util.*; */ public class ParserTopologyBuilder { + public static class ParserTopology { + private TopologyBuilder builder; + private Config topologyConfig; + + private ParserTopology(TopologyBuilder builder, Config topologyConfig) { + this.builder = builder; + this.topologyConfig = topologyConfig; + } + + + public TopologyBuilder getBuilder() { + return builder; + } + + public Config getTopologyConfig() { + return topologyConfig; + } + } + /** * Builds a Storm topology that parses telemetry data received from an external sensor. * * @param zookeeperUrl Zookeeper URL * @param brokerUrl Kafka Broker URL * @param sensorType Type of sensor - * @param spoutParallelism Parallelism hint for the spout - * @param spoutNumTasks Number of tasks for the spout - * @param parserParallelism Parallelism hint for the parser bolt - * @param parserNumTasks Number of tasks for the parser bolt - * @param errorWriterParallelism Parallelism hint for the bolt that handles errors - * @param errorWriterNumTasks Number of tasks for the bolt that handles errors - * @param kafkaSpoutConfig Configuration options for the kafka spout + * @param spoutParallelismSupplier Supplier for the parallelism hint for the spout + * @param spoutNumTasksSupplier Supplier for the number of tasks for the spout + * @param parserParallelismSupplier Supplier for the parallelism hint for the parser bolt + * @param parserNumTasksSupplier Supplier for the number of tasks for the parser bolt + * @param errorWriterParallelismSupplier Supplier for the parallelism hint for the bolt that handles errors + * @param errorWriterNumTasksSupplier Supplier for the number of tasks for the bolt that handles errors + * @param kafkaSpoutConfigSupplier Supplier for the configuration options for the kafka spout + * @param securityProtocolSupplier Supplier for the security protocol + * @param outputTopic The output kafka topic + * @param stormConfigSupplier Supplier for the storm config * @return A Storm topology that parses telemetry data received from an external sensor * @throws Exception */ - public static TopologyBuilder build(String zookeeperUrl, + public static ParserTopology build(String zookeeperUrl, Optional<String> brokerUrl, String sensorType, - int spoutParallelism, - int spoutNumTasks, - int parserParallelism, - int parserNumTasks, - int errorWriterParallelism, - int errorWriterNumTasks, - Map<String, Object> kafkaSpoutConfig, - Optional<String> securityProtocol, - Optional<String> outputTopic + ValueSupplier<Integer> spoutParallelismSupplier, + ValueSupplier<Integer> spoutNumTasksSupplier, + ValueSupplier<Integer> parserParallelismSupplier, + ValueSupplier<Integer> parserNumTasksSupplier, + ValueSupplier<Integer> errorWriterParallelismSupplier, + ValueSupplier<Integer> errorWriterNumTasksSupplier, + ValueSupplier<Map> kafkaSpoutConfigSupplier, + ValueSupplier<String> securityProtocolSupplier, + Optional<String> outputTopic, + ValueSupplier<Config> stormConfigSupplier ) throws Exception { // fetch configuration from zookeeper ParserConfigurations configs = new ParserConfigurations(); SensorParserConfig parserConfig = getSensorParserConfig(zookeeperUrl, sensorType, configs); + int spoutParallelism = spoutParallelismSupplier.get(parserConfig, Integer.class); + int spoutNumTasks = spoutNumTasksSupplier.get(parserConfig, Integer.class); + int parserParallelism = parserParallelismSupplier.get(parserConfig, Integer.class); + int parserNumTasks = parserNumTasksSupplier.get(parserConfig, Integer.class); + int errorWriterParallelism = errorWriterParallelismSupplier.get(parserConfig, Integer.class); + int errorWriterNumTasks = errorWriterNumTasksSupplier.get(parserConfig, Integer.class); + Map<String, Object> kafkaSpoutConfig = kafkaSpoutConfigSupplier.get(parserConfig, Map.class); + Optional<String> securityProtocol = Optional.ofNullable(securityProtocolSupplier.get(parserConfig, String.class)); // create the spout TopologyBuilder builder = new TopologyBuilder(); @@ -102,7 +135,7 @@ public class ParserTopologyBuilder { .shuffleGrouping("parserBolt", Constants.ERROR_STREAM); } - return builder; + return new ParserTopology(builder, stormConfigSupplier.get(parserConfig, Config.class)); } /** @@ -247,16 +280,16 @@ public class ParserTopologyBuilder { * @throws Exception */ private static SensorParserConfig getSensorParserConfig(String zookeeperUrl, String sensorType, ParserConfigurations configs) throws Exception { - CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl); - client.start(); - ConfigurationsUtils.updateParserConfigsFromZookeeper(configs, client); - SensorParserConfig parserConfig = configs.getSensorParserConfig(sensorType); - if (parserConfig == null) { - throw new IllegalStateException("Cannot find the parser configuration in zookeeper for " + sensorType + "." + - " Please check that it exists in zookeeper by using the 'zk_load_configs.sh -m DUMP' command."); + try(CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl)) { + client.start(); + ConfigurationsUtils.updateParserConfigsFromZookeeper(configs, client); + SensorParserConfig parserConfig = configs.getSensorParserConfig(sensorType); + if (parserConfig == null) { + throw new IllegalStateException("Cannot find the parser configuration in zookeeper for " + sensorType + "." + + " Please check that it exists in zookeeper by using the 'zk_load_configs.sh -m DUMP' command."); + } + return parserConfig; } - client.close(); - return parserConfig; } /** http://git-wip-us.apache.org/repos/asf/metron/blob/38eebd59/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java index 8ff4f93..b5ee628 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java @@ -18,10 +18,14 @@ package org.apache.metron.parsers.topology; import org.apache.metron.common.Constants; +import org.apache.metron.parsers.topology.config.ValueSupplier; import org.apache.metron.storm.kafka.flux.SpoutConfiguration; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; import com.fasterxml.jackson.core.type.TypeReference; @@ -235,12 +239,19 @@ public class ParserTopologyCLI { return has(cli)?cli.getOptionValue(shortCode):def; } - public static Config getConfig(CommandLine cli) { - Config config = new Config(); + public static Optional<Config> getConfig(CommandLine cli) { + return getConfig(cli, new Config()); + } + + public static Optional<Config> getConfig(CommandLine cli, Config config) { + if(EXTRA_OPTIONS.has(cli)) { + Map<String, Object> extraOptions = readJSONMapFromFile(new File(EXTRA_OPTIONS.get(cli))); + config.putAll(extraOptions); + } for(ParserOptions option : ParserOptions.values()) { config = option.configHandler.apply(new Arg(config, option.get(cli))); } - return config; + return config.isEmpty()?Optional.empty():Optional.of(config); } public static CommandLine parse(CommandLineParser parser, String[] args) throws ParseException { @@ -273,65 +284,172 @@ public class ParserTopologyCLI { } } + private static CommandLine parse(Options options, String[] args) { + CommandLineParser parser = new PosixParser(); + try { + return ParserOptions.parse(parser, args); + } catch (ParseException pe) { + pe.printStackTrace(); + final HelpFormatter usageFormatter = new HelpFormatter(); + usageFormatter.printHelp("ParserTopologyCLI", null, options, null, true); + System.exit(-1); + return null; + } + } + + public ParserTopologyBuilder.ParserTopology createParserTopology(final CommandLine cmd) throws Exception { + String zookeeperUrl = ParserOptions.ZK_QUORUM.get(cmd); + Optional<String> brokerUrl = ParserOptions.BROKER_URL.has(cmd)?Optional.of(ParserOptions.BROKER_URL.get(cmd)):Optional.empty(); + String sensorType= ParserOptions.SENSOR_TYPE.get(cmd); + + /* + It bears mentioning why we're creating this ValueSupplier indirection here. + As a separation of responsibilities, the CLI class defines the order of precedence + for the various topological and structural properties for creating a parser. This is + desirable because there are now (i.e. integration tests) + and may be in the future (i.e. a REST service to start parsers without using the CLI) + other mechanisms to construct parser topologies. It's sensible to split those concerns.. + + Unfortunately, determining the structural parameters for a parser requires interacting with + external services (e.g. zookeeper) that are set up well within the ParserTopology class. + Rather than pulling the infrastructure to interact with those services out and moving it into the + CLI class and breaking that separation of concerns, we've created a supplier + indirection where are providing the logic as to how to create precedence in the CLI class + without owning the responsibility of constructing the infrastructure where the values are + necessarily supplied. + + */ + ValueSupplier<Integer> spoutParallelism = (parserConfig, clazz) -> { + if(ParserOptions.SPOUT_PARALLELISM.has(cmd)) { + return Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1")); + } + return Optional.ofNullable(parserConfig.getSpoutParallelism()).orElse(1); + }; + ValueSupplier<Integer> spoutNumTasks = (parserConfig, clazz) -> { + if(ParserOptions.SPOUT_NUM_TASKS.has(cmd)) { + return Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1")); + } + return Optional.ofNullable(parserConfig.getSpoutNumTasks()).orElse(1); + }; + ValueSupplier<Integer> parserParallelism = (parserConfig, clazz) -> { + if(ParserOptions.PARSER_PARALLELISM.has(cmd)) { + return Integer.parseInt(ParserOptions.PARSER_PARALLELISM.get(cmd, "1")); + } + return Optional.ofNullable(parserConfig.getParserParallelism()).orElse(1); + }; + + ValueSupplier<Integer> parserNumTasks = (parserConfig, clazz) -> { + if(ParserOptions.PARSER_NUM_TASKS.has(cmd)) { + return Integer.parseInt(ParserOptions.PARSER_NUM_TASKS.get(cmd, "1")); + } + return Optional.ofNullable(parserConfig.getParserNumTasks()).orElse(1); + }; + + ValueSupplier<Integer> errorParallelism = (parserConfig, clazz) -> { + if(ParserOptions.ERROR_WRITER_PARALLELISM.has(cmd)) { + return Integer.parseInt(ParserOptions.ERROR_WRITER_PARALLELISM.get(cmd, "1")); + } + return Optional.ofNullable(parserConfig.getErrorWriterParallelism()).orElse(1); + }; + + ValueSupplier<Integer> errorNumTasks = (parserConfig, clazz) -> { + if(ParserOptions.ERROR_WRITER_NUM_TASKS.has(cmd)) { + return Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, "1")); + } + return Optional.ofNullable(parserConfig.getErrorWriterNumTasks()).orElse(1); + }; + + ValueSupplier<Map> spoutConfig = (parserConfig, clazz) -> { + if(ParserOptions.SPOUT_CONFIG.has(cmd)) { + return readJSONMapFromFile(new File(ParserOptions.SPOUT_CONFIG.get(cmd))); + } + return Optional.ofNullable(parserConfig.getSpoutConfig()).orElse(new HashMap<>()); + }; + + ValueSupplier<String> securityProtocol = (parserConfig, clazz) -> { + Optional<String> sp = Optional.empty(); + if (ParserOptions.SECURITY_PROTOCOL.has(cmd)) { + sp = Optional.of(ParserOptions.SECURITY_PROTOCOL.get(cmd)); + } + if (!sp.isPresent()) { + sp = getSecurityProtocol(sp, spoutConfig.get(parserConfig, Map.class)); + } + return sp.orElse(Optional.ofNullable(parserConfig.getSecurityProtocol()).orElse(null)); + }; + + ValueSupplier<Config> stormConf = (parserConfig, clazz) -> { + Map<String, Object> c = parserConfig.getStormConfig(); + Config finalConfig = new Config(); + if(c != null && !c.isEmpty()) { + finalConfig.putAll(c); + } + if(parserConfig.getNumAckers() != null) { + Config.setNumAckers(finalConfig, parserConfig.getNumAckers()); + } + if(parserConfig.getNumWorkers() != null) { + Config.setNumWorkers(finalConfig, parserConfig.getNumWorkers()); + } + return ParserOptions.getConfig(cmd, finalConfig).orElse(finalConfig); + }; + + Optional<String> outputTopic = ParserOptions.OUTPUT_TOPIC.has(cmd)?Optional.of(ParserOptions.OUTPUT_TOPIC.get(cmd)):Optional.empty(); + + return getParserTopology(zookeeperUrl, brokerUrl, sensorType, spoutParallelism, spoutNumTasks, parserParallelism, parserNumTasks, errorParallelism, errorNumTasks, spoutConfig, securityProtocol, stormConf, outputTopic); + } + + protected ParserTopologyBuilder.ParserTopology getParserTopology( String zookeeperUrl + , Optional<String> brokerUrl + , String sensorType + , ValueSupplier<Integer> spoutParallelism + , ValueSupplier<Integer> spoutNumTasks + , ValueSupplier<Integer> parserParallelism + , ValueSupplier<Integer> parserNumTasks + , ValueSupplier<Integer> errorParallelism + , ValueSupplier<Integer> errorNumTasks + , ValueSupplier<Map> spoutConfig + , ValueSupplier<String> securityProtocol + , ValueSupplier<Config> stormConf + , Optional<String> outputTopic + ) throws Exception + { + return ParserTopologyBuilder.build(zookeeperUrl, + brokerUrl, + sensorType, + spoutParallelism, + spoutNumTasks, + parserParallelism, + parserNumTasks, + errorParallelism, + errorNumTasks, + spoutConfig, + securityProtocol, + outputTopic, + stormConf + ); + } + + public static void main(String[] args) { - Options options = new Options(); try { - CommandLineParser parser = new PosixParser(); - CommandLine cmd = null; - try { - cmd = ParserOptions.parse(parser, args); - } catch (ParseException pe) { - pe.printStackTrace(); - final HelpFormatter usageFormatter = new HelpFormatter(); - usageFormatter.printHelp("ParserTopologyCLI", null, options, null, true); - System.exit(-1); - } + Options options = new Options(); + final CommandLine cmd = parse(options, args); if (cmd.hasOption("h")) { final HelpFormatter usageFormatter = new HelpFormatter(); usageFormatter.printHelp("ParserTopologyCLI", null, options, null, true); System.exit(0); } - String zookeeperUrl = ParserOptions.ZK_QUORUM.get(cmd);; - Optional<String> brokerUrl = ParserOptions.BROKER_URL.has(cmd)?Optional.of(ParserOptions.BROKER_URL.get(cmd)):Optional.empty(); + ParserTopologyCLI cli = new ParserTopologyCLI(); + ParserTopologyBuilder.ParserTopology topology = cli.createParserTopology(cmd); String sensorType= ParserOptions.SENSOR_TYPE.get(cmd); - int spoutParallelism = Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1")); - int spoutNumTasks = Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1")); - int parserParallelism = Integer.parseInt(ParserOptions.PARSER_PARALLELISM.get(cmd, "1")); - int parserNumTasks= Integer.parseInt(ParserOptions.PARSER_NUM_TASKS.get(cmd, "1")); - int errorParallelism = Integer.parseInt(ParserOptions.ERROR_WRITER_PARALLELISM.get(cmd, "1")); - int errorNumTasks= Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, "1")); - int invalidParallelism = Integer.parseInt(ParserOptions.INVALID_WRITER_PARALLELISM.get(cmd, "1")); - int invalidNumTasks= Integer.parseInt(ParserOptions.INVALID_WRITER_NUM_TASKS.get(cmd, "1")); - Map<String, Object> spoutConfig = new HashMap<>(); - if(ParserOptions.SPOUT_CONFIG.has(cmd)) { - spoutConfig = readSpoutConfig(new File(ParserOptions.SPOUT_CONFIG.get(cmd))); - } - Optional<String> outputTopic = ParserOptions.OUTPUT_TOPIC.has(cmd)?Optional.of(ParserOptions.OUTPUT_TOPIC.get(cmd)):Optional.empty(); - Optional<String> securityProtocol = ParserOptions.SECURITY_PROTOCOL.has(cmd)?Optional.of(ParserOptions.SECURITY_PROTOCOL.get(cmd)):Optional.empty(); - securityProtocol = getSecurityProtocol(securityProtocol, spoutConfig); - TopologyBuilder builder = ParserTopologyBuilder.build(zookeeperUrl, - brokerUrl, - sensorType, - spoutParallelism, - spoutNumTasks, - parserParallelism, - parserNumTasks, - errorParallelism, - errorNumTasks, - spoutConfig, - securityProtocol, - outputTopic - ); - Config stormConf = ParserOptions.getConfig(cmd); if (ParserOptions.TEST.has(cmd)) { - stormConf.put(Config.TOPOLOGY_DEBUG, true); + topology.getTopologyConfig().put(Config.TOPOLOGY_DEBUG, true); LocalCluster cluster = new LocalCluster(); - cluster.submitTopology(sensorType, stormConf, builder.createTopology()); + cluster.submitTopology(sensorType, topology.getTopologyConfig(), topology.getBuilder().createTopology()); Utils.sleep(300000); cluster.shutdown(); } else { - StormSubmitter.submitTopology(sensorType, stormConf, builder.createTopology()); + StormSubmitter.submitTopology(sensorType, topology.getTopologyConfig(), topology.getBuilder().createTopology()); } } catch (Exception e) { e.printStackTrace(); @@ -347,13 +465,13 @@ public class ParserTopologyCLI { if(!ret.isPresent()) { ret = Optional.ofNullable((String) spoutConfig.get("security.protocol")); } - if(ret.isPresent() && protocol.get().equalsIgnoreCase("PLAINTEXT")) { + if(ret.isPresent() && ret.get().equalsIgnoreCase("PLAINTEXT")) { ret = Optional.empty(); } return ret; } - private static Map<String, Object> readSpoutConfig(File inputFile) { + private static Map<String, Object> readJSONMapFromFile(File inputFile) { String json = null; if (inputFile.exists()) { try { http://git-wip-us.apache.org/repos/asf/metron/blob/38eebd59/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java new file mode 100644 index 0000000..0ede0f8 --- /dev/null +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.parsers.topology.config; + +import org.apache.metron.common.configuration.SensorParserConfig; + + +/** + * Supplies a value given a sensor config. + * @param <T> + */ +public interface ValueSupplier<T> { + T get(SensorParserConfig config, Class<T> clazz); +} http://git-wip-us.apache.org/repos/asf/metron/blob/38eebd59/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java index 3e8e2db..63d9e52 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java @@ -87,23 +87,29 @@ public class ParserTopologyComponent implements InMemoryComponent { @Override public void start() throws UnableToStartException { try { - TopologyBuilder topologyBuilder = ParserTopologyBuilder.build(topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY) + final Map<String, Object> stormConf = new HashMap<>(); + stormConf.put(Config.TOPOLOGY_DEBUG, true); + ParserTopologyBuilder.ParserTopology topologyBuilder = ParserTopologyBuilder.build(topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY) , Optional.ofNullable(brokerUrl) , sensorType - , 1 - , 1 - , 1 - , 1 - , 1 - , 1 - , null - , Optional.empty() + , (x,y) -> 1 + , (x,y) -> 1 + , (x,y) -> 1 + , (x,y) -> 1 + , (x,y) -> 1 + , (x,y) -> 1 + , (x,y) -> new HashMap<>() + , (x,y) -> null , Optional.ofNullable(outputTopic) + , (x,y) -> { + Config c = new Config(); + c.putAll(stormConf); + return c; + } ); - Map<String, Object> stormConf = new HashMap<>(); - stormConf.put(Config.TOPOLOGY_DEBUG, true); + stormCluster = new LocalCluster(); - stormCluster.submitTopology(sensorType, stormConf, topologyBuilder.createTopology()); + stormCluster.submitTopology(sensorType, stormConf, topologyBuilder.getBuilder().createTopology()); } catch (Exception e) { throw new UnableToStartException("Unable to start parser topology for sensorType: " + sensorType, e); } http://git-wip-us.apache.org/repos/asf/metron/blob/38eebd59/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java index 5f536a5..97dac5a 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java @@ -18,8 +18,12 @@ package org.apache.metron.parsers.topology; +import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.commons.cli.Parser; import org.apache.log4j.Level; +import org.apache.metron.common.configuration.SensorParserConfig; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.parsers.topology.config.ValueSupplier; import org.apache.metron.test.utils.UnitTestHelper; import org.apache.storm.Config; import com.google.common.collect.ImmutableMap; @@ -34,7 +38,10 @@ import org.junit.Test; import java.io.File; import java.io.IOException; +import java.lang.ref.Reference; import java.util.*; +import java.util.function.Predicate; +import java.util.function.Supplier; public class ParserTopologyCLITest { @@ -142,7 +149,8 @@ public class ParserTopologyCLITest { .with(ParserTopologyCLI.ParserOptions.NUM_MAX_TASK_PARALLELISM, "3") .with(ParserTopologyCLI.ParserOptions.MESSAGE_TIMEOUT, "4") .build(longOpt); - Config config = ParserTopologyCLI.ParserOptions.getConfig(cli); + Optional<Config> configOptional = ParserTopologyCLI.ParserOptions.getConfig(cli); + Config config = configOptional.get(); Assert.assertEquals(1, config.get(Config.TOPOLOGY_WORKERS)); Assert.assertEquals(2, config.get(Config.TOPOLOGY_ACKER_EXECUTORS)); Assert.assertEquals(3, config.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM)); @@ -189,7 +197,8 @@ public class ParserTopologyCLITest { .with(ParserTopologyCLI.ParserOptions.MESSAGE_TIMEOUT, "4") .with(ParserTopologyCLI.ParserOptions.EXTRA_OPTIONS, extraFile.getAbsolutePath()) .build(longOpt); - Config config = ParserTopologyCLI.ParserOptions.getConfig(cli); + Optional<Config> configOptional = ParserTopologyCLI.ParserOptions.getConfig(cli); + Config config = configOptional.get(); Assert.assertEquals(4, config.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)); Assert.assertEquals("foo", config.get("string")); Assert.assertEquals(1, config.get("integer")); @@ -197,4 +206,416 @@ public class ParserTopologyCLITest { extraFile.deleteOnExit(); } } + + private static class ParserInput { + private Integer spoutParallelism; + private Integer spoutNumTasks; + private Integer parserParallelism; + private Integer parserNumTasks; + private Integer errorParallelism; + private Integer errorNumTasks; + private Map<String, Object> spoutConfig; + private String securityProtocol; + private Config stormConf; + + public ParserInput( ValueSupplier<Integer> spoutParallelism + , ValueSupplier<Integer> spoutNumTasks + , ValueSupplier<Integer> parserParallelism + , ValueSupplier<Integer> parserNumTasks + , ValueSupplier<Integer> errorParallelism + , ValueSupplier<Integer> errorNumTasks + , ValueSupplier<Map> spoutConfig + , ValueSupplier<String> securityProtocol + , ValueSupplier<Config> stormConf + , SensorParserConfig config + ) + { + this.spoutParallelism = spoutParallelism.get(config, Integer.class); + this.spoutNumTasks = spoutNumTasks.get(config, Integer.class); + this.parserParallelism = parserParallelism.get(config, Integer.class); + this.parserNumTasks = parserNumTasks.get(config, Integer.class); + this.errorParallelism = errorParallelism.get(config, Integer.class); + this.errorNumTasks = errorNumTasks.get(config, Integer.class); + this.spoutConfig = spoutConfig.get(config, Map.class); + this.securityProtocol = securityProtocol.get(config, String.class); + this.stormConf = stormConf.get(config, Config.class); + } + + public Integer getSpoutParallelism() { + return spoutParallelism; + } + + public Integer getSpoutNumTasks() { + return spoutNumTasks; + } + + public Integer getParserParallelism() { + return parserParallelism; + } + + public Integer getParserNumTasks() { + return parserNumTasks; + } + + public Integer getErrorParallelism() { + return errorParallelism; + } + + public Integer getErrorNumTasks() { + return errorNumTasks; + } + + public Map<String, Object> getSpoutConfig() { + return spoutConfig; + } + + public String getSecurityProtocol() { + return securityProtocol; + } + + public Config getStormConf() { + return stormConf; + } + } + /** +{ + "parserClassName": "org.apache.metron.parsers.GrokParser", + "sensorTopic": "squid", + "parserConfig": { + "grokPath": "/patterns/squid", + "patternLabel": "SQUID_DELIMITED", + "timestampField": "timestamp" + }, + "fieldTransformations" : [ + { + "transformation" : "STELLAR" + ,"output" : [ "full_hostname", "domain_without_subdomains" ] + ,"config" : { + "full_hostname" : "URL_TO_HOST(url)" + ,"domain_without_subdomains" : "DOMAIN_REMOVE_SUBDOMAINS(full_hostname)" + } + } + ] +} + */ + @Multiline + public static String baseConfig; + private static SensorParserConfig getBaseConfig() { + try { + return JSONUtils.INSTANCE.load(baseConfig, SensorParserConfig.class); + } catch (IOException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + @Test + public void testSpoutParallelism() throws Exception { + testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_PARALLELISM + , "10" + , input -> input.getSpoutParallelism().equals(10) + , () -> { + SensorParserConfig config = getBaseConfig(); + config.setSpoutParallelism(20); + return config; + } + , input -> input.getSpoutParallelism().equals(20) + ); + } + + @Test + public void testSpoutNumTasks() throws Exception { + testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_NUM_TASKS + , "10" + , input -> input.getSpoutNumTasks().equals(10) + , () -> { + SensorParserConfig config = getBaseConfig(); + config.setSpoutNumTasks(20); + return config; + } + , input -> input.getSpoutNumTasks().equals(20) + ); + } + + @Test + public void testParserParallelism() throws Exception { + testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_PARALLELISM + , "10" + , input -> input.getParserParallelism().equals(10) + , () -> { + SensorParserConfig config = getBaseConfig(); + config.setParserParallelism(20); + return config; + } + , input -> input.getParserParallelism().equals(20) + ); + } + + @Test + public void testParserNumTasks() throws Exception { + testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_NUM_TASKS + , "10" + , input -> input.getParserNumTasks().equals(10) + , () -> { + SensorParserConfig config = getBaseConfig(); + config.setParserNumTasks(20); + return config; + } + , input -> input.getParserNumTasks().equals(20) + ); + } + + @Test + public void testErrorParallelism() throws Exception { + testConfigOption(ParserTopologyCLI.ParserOptions.ERROR_WRITER_PARALLELISM + , "10" + , input -> input.getErrorParallelism().equals(10) + , () -> { + SensorParserConfig config = getBaseConfig(); + config.setErrorWriterParallelism(20); + return config; + } + , input -> input.getErrorParallelism().equals(20) + ); + } + + @Test + public void testErrorNumTasks() throws Exception { + testConfigOption(ParserTopologyCLI.ParserOptions.ERROR_WRITER_NUM_TASKS + , "10" + , input -> input.getErrorNumTasks().equals(10) + , () -> { + SensorParserConfig config = getBaseConfig(); + config.setErrorWriterNumTasks(20); + return config; + } + , input -> input.getErrorNumTasks().equals(20) + ); + } + + @Test + public void testSecurityProtocol_fromCLI() throws Exception { + testConfigOption(ParserTopologyCLI.ParserOptions.SECURITY_PROTOCOL + , "PLAINTEXT" + , input -> input.getSecurityProtocol().equals("PLAINTEXT") + , () -> { + SensorParserConfig config = getBaseConfig(); + config.setSecurityProtocol("KERBEROS"); + return config; + } + , input -> input.getSecurityProtocol().equals("KERBEROS") + ); + } + + @Test + public void testSecurityProtocol_fromSpout() throws Exception { + //Ultimately the order of precedence is CLI > spout config > parser config + File extraConfig = File.createTempFile("spoutConfig", "json"); + extraConfig.deleteOnExit(); + writeMap(extraConfig, new HashMap<String, Object>() {{ + put("security.protocol", "PLAINTEXTSASL"); + }}); + { + //Ensure that the CLI spout config takes precedence + + testConfigOption(new EnumMap<ParserTopologyCLI.ParserOptions, String>(ParserTopologyCLI.ParserOptions.class) {{ + put(ParserTopologyCLI.ParserOptions.SPOUT_CONFIG, extraConfig.getAbsolutePath()); + put(ParserTopologyCLI.ParserOptions.SECURITY_PROTOCOL, "PLAINTEXT"); + }} + , input -> input.getSecurityProtocol().equals("PLAINTEXT") + , () -> { + SensorParserConfig config = getBaseConfig(); + config.setSecurityProtocol("PLAINTEXTSASL_FROM_ZK"); + return config; + } + , input -> input.getSecurityProtocol().equals("PLAINTEXTSASL_FROM_ZK") + ); + } + { + //Ensure that the spout config takes precedence + testConfigOption(new EnumMap<ParserTopologyCLI.ParserOptions, String>(ParserTopologyCLI.ParserOptions.class) {{ + put(ParserTopologyCLI.ParserOptions.SPOUT_CONFIG, extraConfig.getAbsolutePath()); + }} + , input -> input.getSecurityProtocol().equals("PLAINTEXTSASL") + , () -> { + SensorParserConfig config = getBaseConfig(); + config.setSecurityProtocol("PLAINTEXTSASL_FROM_ZK"); + return config; + } + , input -> input.getSecurityProtocol().equals("PLAINTEXTSASL_FROM_ZK") + ); + } + } + + @Test + public void testTopologyConfig_fromConfigExplicitly() throws Exception { + testConfigOption(new EnumMap<ParserTopologyCLI.ParserOptions, String>(ParserTopologyCLI.ParserOptions.class) + {{ + put(ParserTopologyCLI.ParserOptions.NUM_WORKERS, "10"); + put(ParserTopologyCLI.ParserOptions.NUM_ACKERS, "20"); + }} + , input -> { + Config c = input.getStormConf(); + return (int)c.get(Config.TOPOLOGY_WORKERS) == 10 + && (int)c.get(Config.TOPOLOGY_ACKER_EXECUTORS) == 20; + } + , () -> { + SensorParserConfig config = getBaseConfig(); + config.setNumWorkers(100); + config.setNumAckers(200); + return config; + } + , input -> { + Config c = input.getStormConf(); + return (int)c.get(Config.TOPOLOGY_WORKERS) == 100 + && (int)c.get(Config.TOPOLOGY_ACKER_EXECUTORS) == 200 + ; + } + ); + } + + @Test + public void testTopologyConfig() throws Exception { + File extraConfig = File.createTempFile("topologyConfig", "json"); + extraConfig.deleteOnExit(); + writeMap(extraConfig, new HashMap<String, Object>() {{ + put(Config.TOPOLOGY_DEBUG, true); + }}); + testConfigOption(new EnumMap<ParserTopologyCLI.ParserOptions, String>(ParserTopologyCLI.ParserOptions.class) + {{ + put(ParserTopologyCLI.ParserOptions.NUM_WORKERS, "10"); + put(ParserTopologyCLI.ParserOptions.NUM_ACKERS, "20"); + put(ParserTopologyCLI.ParserOptions.EXTRA_OPTIONS, extraConfig.getAbsolutePath()); + }} + , input -> { + Config c = input.getStormConf(); + return (int)c.get(Config.TOPOLOGY_WORKERS) == 10 + && (int)c.get(Config.TOPOLOGY_ACKER_EXECUTORS) == 20 + && (boolean)c.get(Config.TOPOLOGY_DEBUG); + } + , () -> { + SensorParserConfig config = getBaseConfig(); + config.setStormConfig( + new HashMap<String, Object>() {{ + put(Config.TOPOLOGY_WORKERS, 100); + put(Config.TOPOLOGY_ACKER_EXECUTORS, 200); + }} + ); + return config; + } + , input -> { + Config c = input.getStormConf(); + return (int)c.get(Config.TOPOLOGY_WORKERS) == 100 + && (int)c.get(Config.TOPOLOGY_ACKER_EXECUTORS) == 200 + && !c.containsKey(Config.TOPOLOGY_DEBUG); + } + ); + } + + @Test + public void testSpoutConfig() throws Exception { + File extraConfig = File.createTempFile("spoutConfig", "json"); + extraConfig.deleteOnExit(); + writeMap(extraConfig, new HashMap<String, Object>() {{ + put("extra_config", "from_file"); + }}); + EnumMap<ParserTopologyCLI.ParserOptions, String> cliOptions = new EnumMap<ParserTopologyCLI.ParserOptions, String>(ParserTopologyCLI.ParserOptions.class) + {{ + put(ParserTopologyCLI.ParserOptions.SPOUT_CONFIG, extraConfig.getAbsolutePath()); + }}; + Predicate<ParserInput> cliOverrideExpected = input -> { + return input.getSpoutConfig().get("extra_config").equals("from_file"); + }; + + Predicate<ParserInput> configOverrideExpected = input -> { + return input.getSpoutConfig().get("extra_config").equals("from_zk") + ; + }; + + Supplier<SensorParserConfig> configSupplier = () -> { + SensorParserConfig config = getBaseConfig(); + config.setSpoutConfig( + new HashMap<String, Object>() {{ + put("extra_config", "from_zk"); + }} + ); + return config; + }; + testConfigOption( cliOptions + , cliOverrideExpected + , configSupplier + , configOverrideExpected + ); + } + + private void writeMap(File outFile, Map<String, Object> config) throws IOException { + FileUtils.write(outFile, JSONUtils.INSTANCE.toJSON(config, true)); + } + + private void testConfigOption( ParserTopologyCLI.ParserOptions option + , String cliOverride + , Predicate<ParserInput> cliOverrideCondition + , Supplier<SensorParserConfig> configSupplier + , Predicate<ParserInput> configOverrideCondition + ) throws Exception { + testConfigOption( + new EnumMap<ParserTopologyCLI.ParserOptions, String>(ParserTopologyCLI.ParserOptions.class) {{ + put(option, cliOverride); + }}, + cliOverrideCondition, + configSupplier, + configOverrideCondition + ); + } + + private void testConfigOption( EnumMap<ParserTopologyCLI.ParserOptions, String> options + , Predicate<ParserInput> cliOverrideCondition + , Supplier<SensorParserConfig> configSupplier + , Predicate<ParserInput> configOverrideCondition + ) throws Exception { + //CLI Override + SensorParserConfig config = configSupplier.get(); + { + CLIBuilder builder = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker") + .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk") + .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor"); + for(Map.Entry<ParserTopologyCLI.ParserOptions, String> entry : options.entrySet()) { + builder.with(entry.getKey(), entry.getValue()); + } + CommandLine cmd = builder.build(true); + ParserInput input = getInput(cmd, config); + Assert.assertTrue(cliOverrideCondition.test(input)); + } + // Config Override + { + CLIBuilder builder = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker") + .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk") + .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor"); + CommandLine cmd = builder.build(true); + ParserInput input = getInput(cmd, config); + Assert.assertTrue(configOverrideCondition.test(input)); + } + } + + private static ParserInput getInput(CommandLine cmd, SensorParserConfig config ) throws Exception { + final ParserInput[] parserInput = new ParserInput[]{null}; + new ParserTopologyCLI() { + @Override + protected ParserTopologyBuilder.ParserTopology getParserTopology(String zookeeperUrl, Optional<String> brokerUrl, String sensorType, ValueSupplier<Integer> spoutParallelism, ValueSupplier<Integer> spoutNumTasks, ValueSupplier<Integer> parserParallelism, ValueSupplier<Integer> parserNumTasks, ValueSupplier<Integer> errorParallelism, ValueSupplier<Integer> errorNumTasks, ValueSupplier<Map> spoutConfig, ValueSupplier<String> securityProtocol, ValueSupplier<Config> stormConf, Optional<String> outputTopic) throws Exception { + parserInput[0] = new ParserInput( spoutParallelism + , spoutNumTasks + , parserParallelism + , parserNumTasks + , errorParallelism + , errorNumTasks + , spoutConfig + , securityProtocol + , stormConf + , config + ); + return null; + } + }.createParserTopology(cmd); + return parserInput[0]; + } + }