This is an automated email from the ASF dual-hosted git repository. mmiklavcic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push: new 3b04460 METRON-1788 Batch profiler pull profile information from zookeeper (tigerquoll via mmiklavc) closes apache/metron#1383 3b04460 is described below commit 3b0446006c327b7dea494b3a0bcbb9de4f662d5a Author: tigerquoll <tigerqu...@outlook.com> AuthorDate: Mon May 20 18:05:50 2019 -0600 METRON-1788 Batch profiler pull profile information from zookeeper (tigerquoll via mmiklavc) closes apache/metron#1383 --- metron-analytics/metron-profiler-spark/README.md | 61 ++++++++-- metron-analytics/metron-profiler-spark/pom.xml | 6 + .../profiler/spark/cli/BatchProfilerCLI.java | 126 +++++++++++++++++++-- .../spark/cli/BatchProfilerCLIOptions.java | 15 ++- .../src/main/scripts/start_batch_profiler.sh | 18 ++- .../profiler/spark/cli/BatchProfilerCLITest.java | 28 ++++- .../spark/cli/BatchProfilerZKIntegrationTest.java | 82 ++++++++++++++ .../apache/metron/integration/TestZKServer.java | 79 +++++++++++++ 8 files changed, 395 insertions(+), 20 deletions(-) diff --git a/metron-analytics/metron-profiler-spark/README.md b/metron-analytics/metron-profiler-spark/README.md index 5ee8510..8750550 100644 --- a/metron-analytics/metron-profiler-spark/README.md +++ b/metron-analytics/metron-profiler-spark/README.md @@ -42,8 +42,7 @@ The portion of a profile produced by the Batch Profiler should be indistinguisha For an introduction to the Profiler, see the [Profiler README](../metron-profiler-common/README.md). ## Getting Started - -1. Create a profile definition by editing `$METRON_HOME/config/zookeeper/profiler.json` as follows. +1. If a profile file does not already exist, you can create a profile definition by editing `$METRON_HOME/config/zookeeper/profiler.json` as follows. ``` cat $METRON_HOME/config/zookeeper/profiler.json @@ -60,7 +59,8 @@ For an introduction to the Profiler, see the [Profiler README](../metron-profile "timestampField": "timestamp" } ``` - + See [Specifying profiles](#specifying-profiles) for information on how to load profile definitions from zookeeper. + 1. Ensure that you have archived telemetry available for the Batch Profiler to consume. By default, Metron will store this in HDFS at `/apps/metron/indexing/indexed/*/*`. ``` @@ -80,7 +80,6 @@ For an introduction to the Profiler, see the [Profiler README](../metron-profile ``` log4j.logger.org.apache.metron.profiler.spark=DEBUG ``` - 1. Run the Batch Profiler. ``` @@ -91,6 +90,41 @@ For an introduction to the Profiler, see the [Profiler README](../metron-profile 1. Query for the profile data using the [Profiler Client](../metron-profiler-client/README.md). +## Specifying profiles + +The profile to use for batch processing can be specified as either a JSON file on disk +or by utilizing a profile already loaded into zookeeper for use by the streaming profiler. + +### Loading a profile from disk + +1. If a profile file does not already exist, you can create a profile definition by editing `$METRON_HOME/config/zookeeper/profiler.json` as follows. + + ``` + cat $METRON_HOME/config/zookeeper/profiler.json + { + "profiles": [ + { + "profile": "hello-world", + "foreach": "'global'", + "init": { "count": "0" }, + "update": { "count": "count + 1" }, + "result": "count" + } + ], + "timestampField": "timestamp" + } + ``` +1. When launching the batch profiler directly, use the `--profiles <path to profiler.json>` option. +If using the wrapper script to launch the batch profiler, it will automatically add the command argument +`--profiles $METRON_HOME/config/zookeeper/profiler.json ` to the batch launching process if `$SPARK_PROFILER_USE_ZOOKEEPER` is not defined. + +### Loading a profile from zookeeper + +Choose to use profiles already loaded into zookeeper (e.g. for use by the streaming profiler) by setting the environment variable `$SPARK_PROFILER_USE_ZOOKEEPER`. +This will cause the wrapper script to add `--zookeeper $ZOOKEEPER` to the batch launching process, +which will cause the spark profiler to extract profiles from the zookeeper quorum located at `$ZOOKEEPER`. + + ## Installation The Batch Profiler package is installed automatically when installing Metron using the Ambari MPack. See the following notes when installing the Batch Profiler without the Ambari MPack. @@ -147,9 +181,11 @@ The Batch Profiler requires Spark version 2.3.0+. A script located at `$METRON_HOME/bin/start_batch_profiler.sh` has been provided to simplify running the Batch Profiler. This script makes the following assumptions. - * The script builds the profiles defined in `$METRON_HOME/config/zookeeper/profiler.json`. - + * The script either + * builds the profiles defined in `$METRON_HOME/config/zookeeper/profiler.json`. or + * utilises the profiles already loaded into zookeeper quorum at `$ZOOKEEPER` if the environment variable `$SPARK_PROFILER_USE_ZOOKEEPER` is set. * The properties defined in `$METRON_HOME/config/batch-profiler.properties` are passed to both the Profiler and Spark. You can define both Spark and Profiler properties in this same file. + * The script will also configure the event time field to use if the field value is stored in the `${SPARK_PROFILER_EVENT_TIMESTAMP_FIELD}` environment variable. * The script assumes that Spark is installed at `/usr/hdp/current/spark2-client`. This can be overridden if you define an environment variable called `SPARK_HOME` prior to executing the script. @@ -171,6 +207,8 @@ The Batch Profiler accepts the following arguments when run from the command lin | Argument | Description |--- |--- | [`-p`, `--profiles`](#--profiles) | Path to the profile definitions. +| [`-z`, `--zookeeper`](#--zookeeper) | Zookeeper quorum to read profile definitions from. +| [`-t`, `--timestampfield`](#--timestampfield) | Which data field to use for event time. | [`-c`, `--config`](#--config) | Path to the profiler properties file. | [`-g`, `--globals`](#--globals) | Path to the Stellar global config file. | [`-r`, `--reader`](#--reader) | Path to properties for the DataFrameReader. @@ -178,7 +216,16 @@ The Batch Profiler accepts the following arguments when run from the command lin #### `--profiles` -The path to a file containing the profile definition in JSON. +The path to a file containing the profile definition in JSON. Only one of `--zookeeper` or `--profiles` should be used + +#### `--zookeeper` + +Read profile definitions from the zookeeper quorum at this address. Only one of `--zookeeper` or `--profiles` should be used. + +#### `--timestampfield` + +Specifies which data field to utilising for event time information. The field to use for event time is usually stored as part of the profile. +It can be overridden via this setting. #### `--config` diff --git a/metron-analytics/metron-profiler-spark/pom.xml b/metron-analytics/metron-profiler-spark/pom.xml index 40bd551..8b8d510 100644 --- a/metron-analytics/metron-profiler-spark/pom.xml +++ b/metron-analytics/metron-profiler-spark/pom.xml @@ -144,6 +144,12 @@ <version>${global_log4j_core_version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-integration-test</artifactId> + <version>${project.parent.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java index 29fe4a2..9171f01 100644 --- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java +++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java @@ -17,15 +17,22 @@ * limitations under the License. * */ + package org.apache.metron.profiler.spark.cli; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.MissingOptionException; import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; import org.apache.commons.io.IOUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.metron.common.configuration.ConfigurationsUtils; import org.apache.metron.common.configuration.profiler.ProfilerConfig; import org.apache.metron.profiler.spark.BatchProfiler; +import org.apache.metron.zookeeper.ZKCache; import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; @@ -35,19 +42,23 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.Serializable; import java.lang.invoke.MethodHandles; +import java.util.Optional; import java.util.Properties; -import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILER_PROPS_FILE; import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.GLOBALS_FILE; +import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILER_PROPS_FILE; import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILE_DEFN_FILE; +import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILE_TIMESTAMP_FLD; +import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILE_ZK; import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.READER_PROPS_FILE; import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.parse; /** - * The main entry point which launches the Batch Profiler in Spark. - * + * The main entry point which launches the Batch Profiler iin Spark. + * Profiles can be read from either files (utilising --profiles) + * or zookeeper (utilising --zookeeper) * With this class the Batch Profiler can be submitted using the following command. - * + * <p></p> * <pre>{@code * $SPARK_HOME/bin/spark-submit \ * --class org.apache.metron.profiler.spark.cli.BatchProfilerCLI \ @@ -58,6 +69,17 @@ import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.parse * --profiles profiles.json \ * --reader reader.properties * }</pre> + * <p></p> + * Or to pull the profile information from zookeeper + * <pre>{@code + * $SPARK_HOME/bin/spark-submit \ + * --class org.apache.metron.profiler.spark.cli.BatchProfilerCLI \ + * --properties-file spark.properties \ + * metron-profiler-spark-<version>.jar \ + * --globals global.properties \ + * --zookeeper ZookeeperQuorumForProfiles + * --reader reader.properties + * }</pre> */ public class BatchProfilerCLI implements Serializable { @@ -68,17 +90,19 @@ public class BatchProfilerCLI implements Serializable { public static Properties readerProps; public static ProfilerConfig profiles; - public static void main(String[] args) throws IOException, org.apache.commons.cli.ParseException { + public static void main(String[] args) throws IOException, org.apache.commons.cli.ParseException, Exception { // parse the command line CommandLine commandLine = parseCommandLine(args); + + // read profile information + profiles = Preconditions.checkNotNull(handleProfileDefinitions(commandLine), "An error occurred while reading profile data"); profilerProps = handleProfilerProperties(commandLine); globals = handleGlobals(commandLine); - profiles = handleProfileDefinitions(commandLine); readerProps = handleReaderProperties(commandLine); // the batch profiler must use 'event time' if(!profiles.getTimestampField().isPresent()) { - throw new IllegalArgumentException("The Batch Profiler must use event time. The 'timestampField' must be defined."); + throw new IllegalArgumentException("The Batch Profiler must use event time. The 'timestampField' must be defined in the profile definitions file or via the --timestampField argument."); } // one or more profiles must be defined @@ -97,6 +121,54 @@ public class BatchProfilerCLI implements Serializable { } /** + * Extracts profile information from a file or from zookeeper + * @param commandLine Command line information. + * @return Profile information + * @throws MissingOptionException if command line options are missing + * @throws IOException If there are disk or network issues retrieving profiles + */ + private static ProfilerConfig handleProfileDefinitions(CommandLine commandLine) throws MissingOptionException, IOException { + final String PROFILE_LOCATION_ERROR = + "A single profile location (--profiles or --zookeeper) must be specified"; + ProfilerConfig profiles; + + if ((!PROFILE_ZK.has(commandLine)) && (!PROFILE_DEFN_FILE.has(commandLine))) { + throw new MissingOptionException(PROFILE_LOCATION_ERROR); + } + if (PROFILE_ZK.has(commandLine) && PROFILE_DEFN_FILE.has(commandLine)) { + throw new IllegalArgumentException(PROFILE_LOCATION_ERROR); + } + + if (PROFILE_ZK.has(commandLine)) { + profiles = handleProfileDefinitionsZK(commandLine); + } else { + profiles = handleProfileDefinitionsFile(commandLine); + } + + // event time can specified via command line override + if (PROFILE_TIMESTAMP_FLD.has(commandLine)) { + final String timestampField = PROFILE_TIMESTAMP_FLD.get(commandLine); + Preconditions.checkArgument(!Strings.isNullOrEmpty(timestampField), "timestampField must be not be empty if specified"); + profiles.setTimestampField(timestampField); + } + LOG.info("Utilising profile: {}", profiles.toString()); + return profiles; + } + + /** + * Loads Zookeeper client if one is configured. + * @param zkQuorum Address if zookeeper server + * @return CuratorFramework client if zookeeper configuration defined + */ + private static CuratorFramework createZKClient(final String zkQuorum) { + LOG.info("Loading profiler properties from zookeeper quorum '{}'", zkQuorum); + final CuratorFramework zkClient = ZKCache.createClient(zkQuorum, Optional.empty()); + zkClient.start(); + LOG.info("Zookeeper client created successfully"); + return zkClient; + } + + /** * Load the Stellar globals from a file. * * @param commandLine The command line. @@ -155,7 +227,7 @@ public class BatchProfilerCLI implements Serializable { * * @param commandLine The command line. */ - private static ProfilerConfig handleProfileDefinitions(CommandLine commandLine) throws IOException { + private static ProfilerConfig handleProfileDefinitionsFile(CommandLine commandLine) throws IOException { ProfilerConfig profiles; if(PROFILE_DEFN_FILE.has(commandLine)) { String profilePath = PROFILE_DEFN_FILE.get(commandLine); @@ -173,9 +245,45 @@ public class BatchProfilerCLI implements Serializable { } /** + * Load the profile definitions from ZK server identified in command line + * @param commandLine Address of Zookeeper server + * @return ProfileConfig object stored in zookeeper + * @throws IOException if error occurs during zookeeper read + */ + private static ProfilerConfig handleProfileDefinitionsZK(final CommandLine commandLine) throws IOException { + Preconditions.checkArgument(PROFILE_ZK.has(commandLine)); + ProfilerConfig profiles; + final String zkQuorum = PROFILE_ZK.get(commandLine); + try (final CuratorFramework zkClient = createZKClient(zkQuorum)) { + profiles = readProfileFromZK(zkClient); + } + return profiles; + } + + /** + * Reads profile information utilizing the passed zookeeper client + * @param zkClient Started zookeeper client + * @throws IOException if error occurs while reading profile information from zookeeper + */ + static ProfilerConfig readProfileFromZK(CuratorFramework zkClient) throws IOException { + ProfilerConfig profiles; + try { + LOG.info("Loading profiles from zookeeper"); + profiles = ConfigurationsUtils.readProfilerConfigFromZookeeper(zkClient); + LOG.info("Loaded {} profile(s)", profiles.getProfiles().size()); + } catch (Exception ex) { + throw new IOException( + String.format("Error reading configuration from Zookeeper client %s", + zkClient.toString()), + ex); + } + return profiles; + } + + /** * Parse the command line arguments submitted by the user. * @param args The command line arguments to parse. - * @throws org.apache.commons.cli.ParseException + * @throws org.apache.commons.cli.ParseException if command line has errors */ private static CommandLine parseCommandLine(String[] args) throws ParseException { CommandLineParser parser = new PosixParser(); diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java index d58728a..5d66d3b 100644 --- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java +++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java @@ -17,6 +17,7 @@ * limitations under the License. * */ + package org.apache.metron.profiler.spark.cli; import com.google.common.base.Joiner; @@ -34,10 +35,22 @@ import java.util.function.Supplier; * Profiler. */ public enum BatchProfilerCLIOptions { + PROFILE_ZK(() -> { + Option o = new Option("z", "zookeeper", true, "Zookeeper quorum for profile definitions"); + o.setRequired(false); + return o; + }), + + PROFILE_TIMESTAMP_FLD(() -> { + Option o = new Option("t", "timestampfield", true, + "The name of a field to source event time from"); + o.setRequired(false); + return o; + }), PROFILE_DEFN_FILE(() -> { Option o = new Option("p", "profiles", true, "Path to the profile definitions."); - o.setRequired(true); + o.setRequired(false); return o; }), diff --git a/metron-analytics/metron-profiler-spark/src/main/scripts/start_batch_profiler.sh b/metron-analytics/metron-profiler-spark/src/main/scripts/start_batch_profiler.sh index c489af7..33d9aa4 100644 --- a/metron-analytics/metron-profiler-spark/src/main/scripts/start_batch_profiler.sh +++ b/metron-analytics/metron-profiler-spark/src/main/scripts/start_batch_profiler.sh @@ -21,12 +21,26 @@ METRON_HOME=/usr/metron/${METRON_VERSION} PROFILER_JAR=${METRON_HOME}/lib/${project.artifactId}-${METRON_VERSION}.jar MAIN_CLASS=org.apache.metron.profiler.spark.cli.BatchProfilerCLI PROFILER_PROPS=${PROFILER_PROPS:-"${METRON_HOME}/config/batch-profiler.properties"} -PROFILES_FILE=${PROFILES:-"${METRON_HOME}/config/zookeeper/profiler.json"} SPARK_HOME=${SPARK_HOME:-"/usr/hdp/current/spark2-client"} +PROFILES_FILE=${PROFILES:-"${METRON_HOME}/config/zookeeper/profiler.json"} +ZOOKEEPER_LOCATION=${ZOOKEEPER:-"node1:2181"} + +# allow for an override on event time source via environment variable +if [ -n "$SPARK_PROFILER_EVENT_TIMESTAMP_FIELD" ]; then + EVENT_TIMESTAMP="--timestampfield ${SPARK_PROFILER_EVENT_TIMESTAMP_FIELD}" +fi + +if [ -n "$SPARK_PROFILER_USE_ZOOKEEPER" ]; then + PROFILES_LOCATION="--zookeeper ${ZOOKEEPER_LOCATION}" +else + PROFILES_LOCATION="--profiles ${PROFILES_FILE}" +fi + ${SPARK_HOME}/bin/spark-submit \ --class ${MAIN_CLASS} \ --properties-file ${PROFILER_PROPS} \ ${PROFILER_JAR} \ --config ${PROFILER_PROPS} \ - --profiles ${PROFILES_FILE} + ${PROFILES_LOCATION} ${EVENT_TIMESTAMP} \ + "$@" diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLITest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLITest.java index c27495e..5be195a 100644 --- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLITest.java +++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLITest.java @@ -19,6 +19,7 @@ package org.apache.metron.profiler.spark.cli; +import org.apache.commons.cli.MissingArgumentException; import org.apache.commons.cli.MissingOptionException; import org.junit.Test; @@ -40,7 +41,8 @@ public class BatchProfilerCLITest { } /** - * The user must define the -p, --profiles option. The Profiler cannot work without profiles. + * The user must define the -p, --profiles, -z, --zookeeper options. + * The Profiler cannot work without profiles. */ @Test(expected = MissingOptionException.class) public void mustDefineProfilesOption() throws Exception { @@ -49,6 +51,30 @@ public class BatchProfilerCLITest { } /** + * The user must define one of -p, --profiles, -z, --zookeeper options. + */ + @Test(expected = IllegalArgumentException.class) + public void mustDefineOnlyOneProfilesOption() throws Exception { + String[] args = new String[] { + "--profiles", "src/test/resources/profiles-no-timestamp-field.json", + "--zookeeper", "node1:2181" + }; + BatchProfilerCLI.main(args); + } + + /** + * If a timestamp option is given, it must contain a field name + */ + @Test(expected = MissingArgumentException.class) + public void mustDefineFieldnametoGoWithTimestamp() throws Exception { + String[] args = new String[] { + "--timestampfield" + }; + BatchProfilerCLI.main(args); + } + + + /** * If the profile definition contains no valid profiles, we have a problem. */ @Test(expected = IllegalArgumentException.class) diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerZKIntegrationTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerZKIntegrationTest.java new file mode 100644 index 0000000..d77e775 --- /dev/null +++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerZKIntegrationTest.java @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.profiler.spark.cli; + +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.common.configuration.profiler.ProfilerConfig; +import org.apache.metron.integration.TestZKServer; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; + +public class BatchProfilerZKIntegrationTest { + /** + * { + * "profiles": [ + * { + * "profile": "profile1", + * "foreach": "ip_src_addr", + * "init": { "count": "0" }, + * "update": { "count": "count + 1" }, + * "result": "count" + * } + * ] + * } + */ + @Multiline + private String profile; + + @Test + public void testProfilerZookeeperIntegration() throws Exception { + final byte[] profileExpectedByte = profile.getBytes(StandardCharsets.UTF_8); + final ProfilerConfig expectedProfileConfig = ProfilerConfig.fromBytes(profileExpectedByte); + + TestZKServer.runWithZK( (zkServer, zkClient) -> { + // write bytes to zookeeper + ConfigurationsUtils.writeProfilerConfigToZookeeper(profileExpectedByte, zkClient); + + // read bytes from zookeeper utilizing Batch Profiler functions + final ProfilerConfig profiles = BatchProfilerCLI.readProfileFromZK(zkClient); + + // compare expected values + Assert.assertEquals("Profile read from zookeeper has changes", expectedProfileConfig, profiles); + }); + } + + @Test + public void testProfileZookeeperIntegrationFails() throws Exception { + final byte[] profileExpectedByte = profile.getBytes(StandardCharsets.UTF_8); + final ProfilerConfig expectedProfileConfig = ProfilerConfig.fromBytes(profileExpectedByte); + expectedProfileConfig.setTimestampField("foobar"); + + TestZKServer.runWithZK( (zkServer, zkClient) -> { + // write bytes to zookeeper + ConfigurationsUtils.writeProfilerConfigToZookeeper(profileExpectedByte, zkClient); + + // read bytes from zookeeper utilizing Batch Profiler functions + final ProfilerConfig profiles = BatchProfilerCLI.readProfileFromZK(zkClient); + + // compare expected values + Assert.assertNotEquals("Profile zookeeper integration test fails to detect change", expectedProfileConfig, profiles); + }); + } +} diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/TestZKServer.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/TestZKServer.java new file mode 100644 index 0000000..9ce02e4 --- /dev/null +++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/TestZKServer.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.integration; + +import java.io.Closeable; +import org.apache.curator.framework.CuratorFramework; +import org.apache.metron.integration.components.ZKServerComponent; +import org.apache.metron.stellar.common.configuration.ConfigurationsUtils; + + +/** + * Closable wrapper around ZKServerComponent so it can be cleanly used in a resource protection block + */ +public class TestZKServer implements Closeable { + private ZKServerComponent testZkServer; + private String zookeeperUrl; + + /** + * BiConsuming interface that allows Exceptions to be thrown + */ + @FunctionalInterface + public interface ThrowingBiConsumer<T, U> { + void accept(T t, U u) throws Exception; + } + + /** + * Utility method to allow lambdas to automatically be fed a started Zookeeper client and server + * which are automatically cleaned up after the lambda finishes running or throws an exception + * @param testFunc Lambda containing the code to run with the zookeeper client/server + * @throws Exception Any exceptions thrown by the 'testFunc' lambda will bubble up the call chain + */ + static public void runWithZK(ThrowingBiConsumer<TestZKServer,CuratorFramework> testFunc) throws Exception { + try (TestZKServer zkServer = new TestZKServer(); + CuratorFramework zkClient = zkServer.newClient()) { + zkClient.start(); + testFunc.accept(zkServer, zkClient); + } + } + + public TestZKServer() throws UnableToStartException { + testZkServer = new ZKServerComponent(); + testZkServer.start(); + zookeeperUrl = testZkServer.getConnectionString(); + } + + public String getZookeeperUrl() { + return zookeeperUrl; + } + + /** + * Create a new zookeeper client configured to use our test Zookeeper server + * @return CuratorFramework client + */ + public CuratorFramework newClient() { + return ConfigurationsUtils.getClient(zookeeperUrl); + } + + @Override + public void close() { + testZkServer.stop(); + testZkServer.reset(); + } +}