METRON-1533 Create KAFKA_FIND Stellar Function (nickwallen) closes apache/metron#1025
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/8202cd24 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/8202cd24 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/8202cd24 Branch: refs/heads/feature/METRON-1554-pcap-query-panel Commit: 8202cd2425b699afb5c6fb11cbb59b0fcbf4f82d Parents: edec7b1 Author: nickwallen <n...@nickallen.org> Authored: Fri Jun 8 11:11:57 2018 -0400 Committer: nickallen <nickal...@apache.org> Committed: Fri Jun 8 11:11:57 2018 -0400 ---------------------------------------------------------------------- .../metron/management/KafkaFunctions.java | 204 +++++++++++++++++-- .../KafkaFunctionsIntegrationTest.java | 98 +++++++++ 2 files changed, 289 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/8202cd24/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java index a0c92eb..f256672 100644 --- a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java +++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java @@ -18,7 +18,9 @@ package org.apache.metron.management; +import org.apache.commons.lang3.ClassUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -28,7 +30,10 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.metron.common.system.Clock; +import org.apache.metron.profiler.client.stellar.Util; +import org.apache.metron.stellar.common.LambdaExpression; import org.apache.metron.stellar.common.utils.ConversionUtils; +import org.apache.metron.stellar.common.utils.JSONUtils; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.ParseException; import org.apache.metron.stellar.dsl.Stellar; @@ -36,6 +41,7 @@ import org.apache.metron.stellar.dsl.StellarFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collections; @@ -150,18 +156,18 @@ public class KafkaFunctions { public Object apply(List<Object> args, Context context) throws ParseException { // required - name of the topic to retrieve messages from - String topic = ConversionUtils.convert(args.get(0), String.class); + String topic = getArg("topic", 0, String.class, args); // optional - how many messages should be retrieved? int count = 1; if(args.size() > 1) { - count = ConversionUtils.convert(args.get(1), Integer.class); + count = getArg("count", 1, Integer.class, args); } // optional - property overrides provided by the user Map<String, String> overrides = new HashMap<>(); if(args.size() > 2) { - overrides = ConversionUtils.convert(args.get(2), Map.class); + overrides = getArg("overrides", 2, Map.class, args); } // build the properties for kafka @@ -259,18 +265,18 @@ public class KafkaFunctions { public Object apply(List<Object> args, Context context) throws ParseException { // required - name of the topic to retrieve messages from - String topic = ConversionUtils.convert(args.get(0), String.class); + String topic = getArg("topic", 0, String.class, args); // optional - how many messages should be retrieved? int count = 1; if(args.size() > 1) { - count = ConversionUtils.convert(args.get(1), Integer.class); + count = getArg("count", 1, Integer.class, args); } // optional - property overrides provided by the user Map<String, String> overrides = new HashMap<>(); if(args.size() > 2) { - overrides = ConversionUtils.convert(args.get(2), Map.class); + overrides = getArg("overrides", 2, Map.class, args); } Properties properties = buildKafkaProperties(overrides, context); @@ -372,18 +378,18 @@ public class KafkaFunctions { List<String> messages; if(args.get(1) instanceof String) { // a single message needs sent - String msg = ConversionUtils.convert(args.get(1), String.class); + String msg = getArg("message(s)", 1, String.class, args); messages = Collections.singletonList(msg); } else { // a list of messages; all need sent - messages = ConversionUtils.convert(args.get(1), List.class); + messages = getArg("message(s)", 1, List.class, args); } // are there any overrides? Map<String, String> overrides = new HashMap<>(); if(args.size() > 2) { - overrides = ConversionUtils.convert(args.get(2), Map.class); + overrides = getArg("overrides", 2, Map.class, args); } // send the messages @@ -485,7 +491,7 @@ public class KafkaFunctions { // optional - did the user provide any overrides? Map<String, String> overrides = new HashMap<>(); if(args.size() > 0) { - overrides = ConversionUtils.convert(args.get(0), Map.class); + overrides = getArg("overrides", 0, Map.class, args); } return buildKafkaProperties(overrides, context); @@ -504,6 +510,163 @@ public class KafkaFunctions { } /** + * KAFKA_FIND + * + * <p>Finds messages that satisfy a given filter expression. Subsequent calls will continue retrieving messages + * sequentially from the original offset. + * + * <p>Example: Retrieve a 'bro' message. + * <pre> + * {@code + * KAFKA_FIND('topic', m -> MAP_GET('source.type', m) == 'bro') + * } + * </pre> + * + * <p>Example: Find 10 messages that contain geo-location data. + * <pre> + * {@code + * KAFKA_FIND('topic', m -> MAP_EXISTS('geo', m), 10) + * } + * </pre> + */ + @Stellar( + namespace = "KAFKA", + name = "FIND", + description = "Find messages that satisfy a given filter expression. Messages are filtered starting from " + + "the latest offset.", + params = { + "topic - The name of the Kafka topic", + "filter - A lambda expression that filters messages. Messages are presented as a map of fields to the expression.", + "count - The number of Kafka messages to retrieve", + "config - Optional map of key/values that override any global properties." + }, + returns = "The messages as a list of strings" + ) + public static class KafkaFind implements StellarFunction { + + @Override + public Object apply(List<Object> args, Context context) throws ParseException { + // required - name of the topic to retrieve messages from + String topic = getArg("topic", 0, String.class, args); + + // required - a lambda which filters the messages + LambdaExpression filter = getArg("filter", 1, LambdaExpression.class, args); + + // optional - how many messages should be retrieved? + int count = 1; + if(args.size() > 2) { + count = getArg("count", 2, Integer.class, args); + } + + // optional - property overrides provided by the user + Map<String, String> overrides = new HashMap<>(); + if(args.size() > 3) { + overrides = getArg("overrides", 3, Map.class, args); + } + + Properties properties = buildKafkaProperties(overrides, context); + properties.put("max.poll.records", 10 * count); + + return findMessages(topic, filter, count, properties); + } + + /** + * Find messages in Kafka that satisfy a filter expression. + * + * @param topic The kafka topic. + * @param filter The filter expression. + * @param count The maximum number of messages to find. + * @param properties Function configuration values. + * @return A list of messages that satisfy the filter expression. + */ + private List<Object> findMessages(String topic, LambdaExpression filter, int count, Properties properties) { + final int pollTimeout = getPollTimeout(properties); + final int maxWait = getMaxWait(properties); + + List<Object> messages = new ArrayList<>(); + try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) { + + // seek to the end of all topic/partitions + Set<TopicPartition> partitions = manualPartitionAssignment(topic, consumer); + consumer.seekToEnd(partitions); + + // continue until we have enough messages or exceeded the max wait time + long wait = 0L; + final long start = clock.currentTimeMillis(); + while(messages.size() < count && wait < maxWait) { + + // poll kafka for messages + ConsumerRecords<String, String> records = consumer.poll(pollTimeout); + for(ConsumerRecord<String, String> record : records) { + + // only keep the message if the filter expression is satisfied + if(isSatisfied(filter, record.value())) { + messages.add(record.value()); + + // do we have enough messages already? + if(messages.size() >= count) { + break; + } + } + } + + // how long have we waited? + wait = clock.currentTimeMillis() - start; + consumer.commitSync(); + + LOG.debug("KAFKA_FIND polled for messages; topic={}, count={}, waitTime={} ms", + topic, messages.size(), wait); + } + } + + return messages; + } + + /** + * Executes a given expression on a message. + * + * @param expr The filter expression to execute. + * @param message The message that the expression is executed on. + * @return Returns true, only if the expression returns true. If the expression + * returns false or fails to execute, false is returned. + */ + public boolean isSatisfied(LambdaExpression expr, String message) { + boolean result = false; + Map<String, Object> messageAsMap; + try { + // transform the message to a map of fields + messageAsMap = JSONUtils.INSTANCE.load(message, JSONUtils.MAP_SUPPLIER); + + // apply the filter expression + Object out = expr.apply(Collections.singletonList(messageAsMap)); + if(out instanceof Boolean) { + result = (Boolean) out; + + } else { + LOG.error("Expected boolean from filter expression, got {}", ClassUtils.getShortClassName(out, "null")); + } + + } catch(IOException e) { + LOG.error("Unable to parse message", e); + } + + return result; + } + + + @Override + public void initialize(Context context) { + // no initialization required + } + + @Override + public boolean isInitialized() { + // no initialization required + return true; + } + } + + /** * Manually assigns all partitions in a topic to a consumer * * @param topic The topic whose partitions will be assigned. @@ -511,7 +674,6 @@ public class KafkaFunctions { * @return A set of topic-partitions that were manually assigned to the consumer. */ private static Set<TopicPartition> manualPartitionAssignment(String topic, KafkaConsumer<String, String> consumer) { - // find all partitions for the topic Set<TopicPartition> partitions = new HashSet<>(); for(PartitionInfo partition : consumer.partitionsFor(topic)) { @@ -539,7 +701,6 @@ public class KafkaFunctions { * @param context The Stellar context. */ private static Properties buildKafkaProperties(Map<String, String> overrides, Context context) { - // start with minimal set of default properties Properties properties = new Properties(); properties.putAll(defaultProperties); @@ -599,7 +760,6 @@ public class KafkaFunctions { * via the global properties. */ private static Properties defaultKafkaProperties() { - Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("group.id", "kafka-functions-stellar"); @@ -634,4 +794,22 @@ public class KafkaFunctions { return properties; } + + /** + * Get an argument from a list of arguments. + * + * @param argName The name of the argument. + * @param index The index within the list of arguments. + * @param clazz The type expected. + * @param args All of the arguments. + * @param <T> The type of the argument expected. + */ + public static <T> T getArg(String argName, int index, Class<T> clazz, List<Object> args) { + if(index >= args.size()) { + throw new IllegalArgumentException(format("missing '%s'; expected at least %d argument(s), found %d", + argName, index+1, args.size())); + } + + return ConversionUtils.convert(args.get(index), clazz); + } } http://git-wip-us.apache.org/repos/asf/metron/blob/8202cd24/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java index ad45b52..d82bb37 100644 --- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java +++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java @@ -48,6 +48,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Tests the KafkaFunctions class. @@ -109,6 +110,7 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { .withClass(KafkaFunctions.KafkaPut.class) .withClass(KafkaFunctions.KafkaProps.class) .withClass(KafkaFunctions.KafkaTail.class) + .withClass(KafkaFunctions.KafkaFind.class) .withClass(MapFunctions.MapGet.class); } @@ -339,6 +341,102 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { } /** + * KAFKA_FIND should only return messages that satisfy a filter expression. + */ + @Test + public void testKafkaFind() throws Exception { + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // find all messages satisfying the filter expression + Future<Object> future = runAsync("KAFKA_FIND(topic, m -> MAP_GET('value', m) == 23)"); + + // put 10 messages onto the topic for KAFKA_TAIL to grab + runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message2])")); + + // only expect `message2` where value == 23 to be returned + Object actual = future.get(10, TimeUnit.SECONDS); + List<String> expected = Collections.singletonList(message2); + assertEquals(expected, actual); + } + + /** + * KAFKA_FIND should return no messages, if none match the filter expression. + */ + @Test + public void testKafkaFindNone() throws Exception { + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // find all messages satisfying the filter expression + Future<Object> future = runAsync("KAFKA_FIND(topic, m -> false)"); + + // put 10 messages onto the topic for KAFKA_TAIL to grab + runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message1])")); + + // no messages satisfy the filter expression + Object actual = future.get(10, TimeUnit.SECONDS); + List<String> expected = Collections.emptyList(); + assertEquals(expected, actual); + } + + /** + * KAFKA_FIND should return no more messages than its limit. + */ + @Test + public void testKafkaFindMultiple() throws Exception { + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // find all messages satisfying the filter expression + Future<Object> future = runAsync("KAFKA_FIND(topic, m -> true, 2)"); + + // put 10 messages onto the topic for KAFKA_TAIL to grab + runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message2])")); + + // all messages should satisfy the filter + List<String> expected = new ArrayList<String>() {{ + add(message2); + add(message2); + }}; + Object actual = future.get(10, TimeUnit.SECONDS); + assertEquals(expected, actual); + } + + /** + * KAFKA_FIND should wait no more than a maximum time before returning, even if no matching + * messages are found. + */ + @Test + public void testKafkaFindExceedsMaxWait() { + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // write all 3 messages to the topic + run("KAFKA_PUT(topic, [message1, message2, message3])"); + + // execute the test - none of the messages satisfy the filter + long before = System.currentTimeMillis(); + Object actual = run("KAFKA_FIND(topic, m -> false, 10, { 'stellar.kafka.max.wait.millis': 1000 })"); + + // expect not to have waited more than roughly 1000 millis + long wait = System.currentTimeMillis() - before; + assertTrue("Expected wait not to exceed max wait; actual wait = " + wait, wait < 2 * 1000); + + // expect no messages + List<String> expected = Collections.emptyList(); + assertEquals(expected, actual); + } + + /** * Runs a Stellar expression. * @param expression The expression to run. */