This is an automated email from the ASF dual-hosted git repository. nickallen pushed a commit to branch feature/METRON-2088-support-hdp-3.1 in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/feature/METRON-2088-support-hdp-3.1 by this push: new 01f6107 METRON-2301 Building Against Wrong Storm Flux Version (nickwallen) closes apache/metron#1544 01f6107 is described below commit 01f6107438a615612c378b708677be778ed4dad8 Author: nickwallen <nickal...@apache.org> AuthorDate: Wed Oct 30 08:23:52 2019 -0400 METRON-2301 Building Against Wrong Storm Flux Version (nickwallen) closes apache/metron#1544 --- .../metron-common-storm/pom.xml | 2 +- .../metron-elasticsearch-storm/pom.xml | 2 +- metron-platform/metron-integration-test/pom.xml | 2 +- .../components/FluxTopologyComponent.java | 69 +++++++++++----------- metron-platform/metron-pcap-backend/pom.xml | 2 +- .../metron-solr/metron-solr-storm/pom.xml | 2 +- pom.xml | 4 +- 7 files changed, 41 insertions(+), 42 deletions(-) diff --git a/metron-platform/metron-common-streaming/metron-common-storm/pom.xml b/metron-platform/metron-common-streaming/metron-common-storm/pom.xml index 2b9fbed..4426802 100644 --- a/metron-platform/metron-common-streaming/metron-common-storm/pom.xml +++ b/metron-platform/metron-common-streaming/metron-common-storm/pom.xml @@ -72,7 +72,7 @@ <dependency> <groupId>org.apache.storm</groupId> <artifactId>flux-core</artifactId> - <version>${global_flux_version}</version> + <version>${global_storm_version}</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml b/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml index a84e45d..3e71aba 100644 --- a/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml +++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml @@ -62,7 +62,7 @@ <dependency> <groupId>org.apache.storm</groupId> <artifactId>flux-core</artifactId> - <version>${global_flux_version}</version> + <version>${global_storm_version}</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> diff --git a/metron-platform/metron-integration-test/pom.xml b/metron-platform/metron-integration-test/pom.xml index 6bb8291..3906a8f 100644 --- a/metron-platform/metron-integration-test/pom.xml +++ b/metron-platform/metron-integration-test/pom.xml @@ -45,7 +45,7 @@ <dependency> <groupId>org.apache.storm</groupId> <artifactId>flux-core</artifactId> - <version>${global_flux_version}</version> + <version>${global_storm_version}</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java index 1a1ceb0..85a05b0 100644 --- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java +++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java @@ -17,22 +17,6 @@ */ package org.apache.metron.integration.components; -import java.io.File; -import java.io.FileOutputStream; -import java.io.FileWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.lang.invoke.MethodHandles; -import java.lang.reflect.InvocationTargetException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Comparator; -import java.util.Map; -import java.util.Properties; -import java.util.regex.Pattern; import org.apache.commons.io.FileUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; @@ -55,6 +39,19 @@ import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.io.StringReader; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.InvocationTargetException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Pattern; + public class FluxTopologyComponent implements InMemoryComponent { protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -249,7 +246,7 @@ public class FluxTopologyComponent implements InMemoryComponent { } private void startTopology(String topologyName, File topologyLoc, File templateFile, Properties properties) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, TException, NoSuchFieldException{ - TopologyDef topologyDef = loadYaml(topologyName, topologyLoc, templateFile, properties); + TopologyDef topologyDef = loadYaml(topologyLoc, templateFile, properties); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); StormTopology topology = FluxBuilder.buildTopology(context); @@ -267,26 +264,30 @@ public class FluxTopologyComponent implements InMemoryComponent { } } - private static TopologyDef loadYaml(String topologyName, File yamlFile, File templateFile, Properties properties) throws IOException { - File tmpFile = File.createTempFile(topologyName, "props"); - tmpFile.deleteOnExit(); + /** + * Creates a Storm topology. + * @param yamlFile The Flux file defining the topology. + * @param templateFile The template file used by the Mpack to create the topology's properties. For example, 'enrichment.properties.j2'. + * @param properties The topology properties. + * @return The Storm topology. + * @throws IOException + */ + private static TopologyDef loadYaml(File yamlFile, File templateFile, Properties properties) throws IOException { + Properties topologyProperties; if (templateFile != null) { - try (Writer propWriter = new OutputStreamWriter(new FileOutputStream(tmpFile), StandardCharsets.UTF_8)){ - String templateContents = FileUtils.readFileToString(templateFile); - for(Map.Entry prop: properties.entrySet()) { - String replacePattern = String.format("{{%s}}", prop.getKey()); - templateContents = templateContents.replaceAll(Pattern.quote(replacePattern), (String) prop.getValue()); - } - propWriter.write(templateContents); - propWriter.flush(); - return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false); + // use the MPack template file (like 'enrichment.properties.j2') to generate the topology properties + String templateContents = FileUtils.readFileToString(templateFile); + for(Map.Entry prop: properties.entrySet()) { + String replacePattern = String.format("{{%s}}", prop.getKey()); + templateContents = templateContents.replaceAll(Pattern.quote(replacePattern), (String) prop.getValue()); } + topologyProperties = new Properties(); + topologyProperties.load(new StringReader(templateContents)); + return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, topologyProperties, false); + } else { - try (Writer propWriter = new OutputStreamWriter(new FileOutputStream(tmpFile), StandardCharsets.UTF_8)){ - properties.store(propWriter, topologyName + " properties"); - return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false); - } + // otherwise, just use the properties directly + return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, properties, false); } - } } diff --git a/metron-platform/metron-pcap-backend/pom.xml b/metron-platform/metron-pcap-backend/pom.xml index cf49ecf..e21a0b0 100644 --- a/metron-platform/metron-pcap-backend/pom.xml +++ b/metron-platform/metron-pcap-backend/pom.xml @@ -56,7 +56,7 @@ <dependency> <groupId>org.apache.storm</groupId> <artifactId>flux-core</artifactId> - <version>${global_flux_version}</version> + <version>${global_storm_version}</version> </dependency> <dependency> <groupId>org.apache.metron</groupId> diff --git a/metron-platform/metron-solr/metron-solr-storm/pom.xml b/metron-platform/metron-solr/metron-solr-storm/pom.xml index ba46af3..889ff5f 100644 --- a/metron-platform/metron-solr/metron-solr-storm/pom.xml +++ b/metron-platform/metron-solr/metron-solr-storm/pom.xml @@ -114,7 +114,7 @@ <dependency> <groupId>org.apache.storm</groupId> <artifactId>flux-core</artifactId> - <version>${global_flux_version}</version> + <version>${global_storm_version}</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> diff --git a/pom.xml b/pom.xml index 9437439..ec5dd4f 100644 --- a/pom.xml +++ b/pom.xml @@ -87,8 +87,6 @@ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <!-- base project versions --> - <base_storm_version>1.0.1</base_storm_version> - <base_flux_version>1.0.1</base_flux_version> <base_kafka_version>0.10.0</base_kafka_version> <base_hadoop_version>2.7.1</base_hadoop_version> <base_flume_version>1.5.2</base_flume_version> @@ -102,7 +100,6 @@ <global_hbase_guava_version>12.0</global_hbase_guava_version> <global_storm_version>1.0.3</global_storm_version> <global_storm_kafka_version>1.2.2</global_storm_kafka_version> - <global_flux_version>${base_flux_version}</global_flux_version> <global_pcap_version>1.7.1</global_pcap_version> <global_hadoop_version>${base_hadoop_version}</global_hadoop_version> <global_flume_version>${base_flume_version}</global_flume_version> @@ -165,6 +162,7 @@ <global_hbase_version>1.1.1</global_hbase_version> <global_hbase_guava_version>12.0</global_hbase_guava_version> <global_storm_kafka_version>1.2.2</global_storm_kafka_version> + <base_storm_version>1.0.1</base_storm_version> <global_storm_version>${base_storm_version}.${hdp_version}-${build_number}</global_storm_version> <global_kafka_version>${base_kafka_version}.${hdp_version}-${build_number}</global_kafka_version> <global_zeppelin_version>0.7.3</global_zeppelin_version>