METRON-1533 Create KAFKA_FIND Stellar Function (nickwallen) closes 
apache/metron#1025


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/8202cd24
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/8202cd24
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/8202cd24

Branch: refs/heads/feature/METRON-1554-pcap-query-panel
Commit: 8202cd2425b699afb5c6fb11cbb59b0fcbf4f82d
Parents: edec7b1
Author: nickwallen <n...@nickallen.org>
Authored: Fri Jun 8 11:11:57 2018 -0400
Committer: nickallen <nickal...@apache.org>
Committed: Fri Jun 8 11:11:57 2018 -0400

----------------------------------------------------------------------
 .../metron/management/KafkaFunctions.java       | 204 +++++++++++++++++--
 .../KafkaFunctionsIntegrationTest.java          |  98 +++++++++
 2 files changed, 289 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/8202cd24/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
 
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
index a0c92eb..f256672 100644
--- 
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
+++ 
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
@@ -18,7 +18,9 @@
 
 package org.apache.metron.management;
 
+import org.apache.commons.lang3.ClassUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -28,7 +30,10 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.metron.common.system.Clock;
+import org.apache.metron.profiler.client.stellar.Util;
+import org.apache.metron.stellar.common.LambdaExpression;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.stellar.common.utils.JSONUtils;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.ParseException;
 import org.apache.metron.stellar.dsl.Stellar;
@@ -36,6 +41,7 @@ import org.apache.metron.stellar.dsl.StellarFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -150,18 +156,18 @@ public class KafkaFunctions {
     public Object apply(List<Object> args, Context context) throws 
ParseException {
 
       // required - name of the topic to retrieve messages from
-      String topic = ConversionUtils.convert(args.get(0), String.class);
+      String topic = getArg("topic", 0, String.class, args);
 
       // optional - how many messages should be retrieved?
       int count = 1;
       if(args.size() > 1) {
-        count = ConversionUtils.convert(args.get(1), Integer.class);
+        count = getArg("count", 1, Integer.class, args);
       }
 
       // optional - property overrides provided by the user
       Map<String, String> overrides = new HashMap<>();
       if(args.size() > 2) {
-        overrides = ConversionUtils.convert(args.get(2), Map.class);
+        overrides = getArg("overrides", 2, Map.class, args);
       }
 
       // build the properties for kafka
@@ -259,18 +265,18 @@ public class KafkaFunctions {
     public Object apply(List<Object> args, Context context) throws 
ParseException {
 
       // required - name of the topic to retrieve messages from
-      String topic = ConversionUtils.convert(args.get(0), String.class);
+      String topic = getArg("topic", 0, String.class, args);
 
       // optional - how many messages should be retrieved?
       int count = 1;
       if(args.size() > 1) {
-        count = ConversionUtils.convert(args.get(1), Integer.class);
+        count = getArg("count", 1, Integer.class, args);
       }
 
       // optional - property overrides provided by the user
       Map<String, String> overrides = new HashMap<>();
       if(args.size() > 2) {
-        overrides = ConversionUtils.convert(args.get(2), Map.class);
+        overrides = getArg("overrides", 2, Map.class, args);
       }
 
       Properties properties = buildKafkaProperties(overrides, context);
@@ -372,18 +378,18 @@ public class KafkaFunctions {
       List<String> messages;
       if(args.get(1) instanceof String) {
         // a single message needs sent
-        String msg = ConversionUtils.convert(args.get(1), String.class);
+        String msg = getArg("message(s)", 1, String.class, args);
         messages = Collections.singletonList(msg);
 
       } else {
         // a list of messages; all need sent
-        messages = ConversionUtils.convert(args.get(1), List.class);
+        messages = getArg("message(s)", 1, List.class, args);
       }
 
       // are there any overrides?
       Map<String, String> overrides = new HashMap<>();
       if(args.size() > 2) {
-        overrides = ConversionUtils.convert(args.get(2), Map.class);
+        overrides = getArg("overrides", 2, Map.class, args);
       }
 
       // send the messages
@@ -485,7 +491,7 @@ public class KafkaFunctions {
       // optional - did the user provide any overrides?
       Map<String, String> overrides = new HashMap<>();
       if(args.size() > 0) {
-        overrides = ConversionUtils.convert(args.get(0), Map.class);
+        overrides = getArg("overrides", 0, Map.class, args);
       }
 
       return buildKafkaProperties(overrides, context);
@@ -504,6 +510,163 @@ public class KafkaFunctions {
   }
 
   /**
+   * KAFKA_FIND
+   *
+   * <p>Finds messages that satisfy a given filter expression. Subsequent 
calls will continue retrieving messages
+   * sequentially from the original offset.
+   *
+   * <p>Example: Retrieve a 'bro' message.
+   * <pre>
+   * {@code
+   * KAFKA_FIND('topic', m -> MAP_GET('source.type', m) == 'bro')
+   * }
+   * </pre>
+   *
+   * <p>Example: Find 10 messages that contain geo-location data.
+   * <pre>
+   * {@code
+   * KAFKA_FIND('topic', m -> MAP_EXISTS('geo', m), 10)
+   * }
+   * </pre>
+   */
+  @Stellar(
+          namespace = "KAFKA",
+          name = "FIND",
+          description = "Find messages that satisfy a given filter expression. 
Messages are filtered starting from " +
+                  "the latest offset.",
+          params = {
+                  "topic - The name of the Kafka topic",
+                  "filter - A lambda expression that filters messages. 
Messages are presented as a map of fields to the expression.",
+                  "count - The number of Kafka messages to retrieve",
+                  "config - Optional map of key/values that override any 
global properties."
+          },
+          returns = "The messages as a list of strings"
+  )
+  public static class KafkaFind implements StellarFunction {
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws 
ParseException {
+      // required - name of the topic to retrieve messages from
+      String topic = getArg("topic", 0, String.class, args);
+
+      // required - a lambda which filters the messages
+      LambdaExpression filter = getArg("filter", 1, LambdaExpression.class, 
args);
+
+      // optional - how many messages should be retrieved?
+      int count = 1;
+      if(args.size() > 2) {
+        count = getArg("count", 2, Integer.class, args);
+      }
+
+      // optional - property overrides provided by the user
+      Map<String, String> overrides = new HashMap<>();
+      if(args.size() > 3) {
+        overrides = getArg("overrides", 3, Map.class, args);
+      }
+
+      Properties properties = buildKafkaProperties(overrides, context);
+      properties.put("max.poll.records", 10 * count);
+
+      return findMessages(topic, filter, count, properties);
+    }
+
+    /**
+     * Find messages in Kafka that satisfy a filter expression.
+     *
+     * @param topic The kafka topic.
+     * @param filter The filter expression.
+     * @param count The maximum number of messages to find.
+     * @param properties Function configuration values.
+     * @return A list of messages that satisfy the filter expression.
+     */
+    private List<Object> findMessages(String topic, LambdaExpression filter, 
int count, Properties properties) {
+      final int pollTimeout = getPollTimeout(properties);
+      final int maxWait = getMaxWait(properties);
+
+      List<Object> messages = new ArrayList<>();
+      try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(properties)) {
+
+        // seek to the end of all topic/partitions
+        Set<TopicPartition> partitions = manualPartitionAssignment(topic, 
consumer);
+        consumer.seekToEnd(partitions);
+
+        // continue until we have enough messages or exceeded the max wait time
+        long wait = 0L;
+        final long start = clock.currentTimeMillis();
+        while(messages.size() < count && wait < maxWait) {
+
+          // poll kafka for messages
+          ConsumerRecords<String, String> records = consumer.poll(pollTimeout);
+          for(ConsumerRecord<String, String> record : records) {
+
+            // only keep the message if the filter expression is satisfied
+            if(isSatisfied(filter, record.value())) {
+              messages.add(record.value());
+
+              // do we have enough messages already?
+              if(messages.size() >= count) {
+                break;
+              }
+            }
+          }
+
+          // how long have we waited?
+          wait = clock.currentTimeMillis() - start;
+          consumer.commitSync();
+
+          LOG.debug("KAFKA_FIND polled for messages; topic={}, count={}, 
waitTime={} ms",
+                  topic, messages.size(), wait);
+        }
+      }
+
+      return messages;
+    }
+
+    /**
+     * Executes a given expression on a message.
+     *
+     * @param expr The filter expression to execute.
+     * @param message The message that the expression is executed on.
+     * @return Returns true, only if the expression returns true.  If the 
expression
+     * returns false or fails to execute, false is returned.
+     */
+    public boolean isSatisfied(LambdaExpression expr, String message) {
+      boolean result = false;
+      Map<String, Object> messageAsMap;
+      try {
+        // transform the message to a map of fields
+        messageAsMap = JSONUtils.INSTANCE.load(message, 
JSONUtils.MAP_SUPPLIER);
+
+        // apply the filter expression
+        Object out = expr.apply(Collections.singletonList(messageAsMap));
+        if(out instanceof Boolean) {
+          result = (Boolean) out;
+
+        } else {
+          LOG.error("Expected boolean from filter expression, got {}", 
ClassUtils.getShortClassName(out, "null"));
+        }
+
+      } catch(IOException e) {
+        LOG.error("Unable to parse message", e);
+      }
+
+      return result;
+    }
+
+
+    @Override
+    public void initialize(Context context) {
+      // no initialization required
+    }
+
+    @Override
+    public boolean isInitialized() {
+      // no initialization required
+      return true;
+    }
+  }
+
+  /**
    * Manually assigns all partitions in a topic to a consumer
    *
    * @param topic The topic whose partitions will be assigned.
@@ -511,7 +674,6 @@ public class KafkaFunctions {
    * @return A set of topic-partitions that were manually assigned to the 
consumer.
    */
   private static Set<TopicPartition> manualPartitionAssignment(String topic, 
KafkaConsumer<String, String> consumer) {
-
     // find all partitions for the topic
     Set<TopicPartition> partitions = new HashSet<>();
     for(PartitionInfo partition : consumer.partitionsFor(topic)) {
@@ -539,7 +701,6 @@ public class KafkaFunctions {
    * @param context The Stellar context.
    */
   private static Properties buildKafkaProperties(Map<String, String> 
overrides, Context context) {
-
     // start with minimal set of default properties
     Properties properties = new Properties();
     properties.putAll(defaultProperties);
@@ -599,7 +760,6 @@ public class KafkaFunctions {
    * via the global properties.
    */
   private static Properties defaultKafkaProperties() {
-
     Properties properties = new Properties();
     properties.put("bootstrap.servers", "localhost:9092");
     properties.put("group.id", "kafka-functions-stellar");
@@ -634,4 +794,22 @@ public class KafkaFunctions {
 
     return properties;
   }
+
+  /**
+   * Get an argument from a list of arguments.
+   *
+   * @param argName The name of the argument.
+   * @param index The index within the list of arguments.
+   * @param clazz The type expected.
+   * @param args All of the arguments.
+   * @param <T> The type of the argument expected.
+   */
+  public static <T> T getArg(String argName, int index, Class<T> clazz, 
List<Object> args) {
+    if(index >= args.size()) {
+      throw new IllegalArgumentException(format("missing '%s'; expected at 
least %d argument(s), found %d",
+              argName, index+1, args.size()));
+    }
+
+    return ConversionUtils.convert(args.get(index), clazz);
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/8202cd24/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
index ad45b52..d82bb37 100644
--- 
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
+++ 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
@@ -48,6 +48,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests the KafkaFunctions class.
@@ -109,6 +110,7 @@ public class KafkaFunctionsIntegrationTest extends 
BaseIntegrationTest {
             .withClass(KafkaFunctions.KafkaPut.class)
             .withClass(KafkaFunctions.KafkaProps.class)
             .withClass(KafkaFunctions.KafkaTail.class)
+            .withClass(KafkaFunctions.KafkaFind.class)
             .withClass(MapFunctions.MapGet.class);
   }
 
@@ -339,6 +341,102 @@ public class KafkaFunctionsIntegrationTest extends 
BaseIntegrationTest {
   }
   
   /**
+   * KAFKA_FIND should only return messages that satisfy a filter expression.
+   */
+  @Test
+  public void testKafkaFind() throws Exception {
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // find all messages satisfying the filter expression
+    Future<Object> future = runAsync("KAFKA_FIND(topic, m -> MAP_GET('value', 
m) == 23)");
+
+    // put 10 messages onto the topic for KAFKA_TAIL to grab
+    runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message2])"));
+
+    // only expect `message2` where value == 23 to be returned
+    Object actual = future.get(10, TimeUnit.SECONDS);
+    List<String> expected = Collections.singletonList(message2);
+    assertEquals(expected, actual);
+  }
+
+  /**
+   * KAFKA_FIND should return no messages, if none match the filter expression.
+   */
+  @Test
+  public void testKafkaFindNone() throws Exception {
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // find all messages satisfying the filter expression
+    Future<Object> future = runAsync("KAFKA_FIND(topic, m -> false)");
+
+    // put 10 messages onto the topic for KAFKA_TAIL to grab
+    runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message1])"));
+
+    // no messages satisfy the filter expression
+    Object actual = future.get(10, TimeUnit.SECONDS);
+    List<String> expected = Collections.emptyList();
+    assertEquals(expected, actual);
+  }
+
+  /**
+   * KAFKA_FIND should return no more messages than its limit.
+   */
+  @Test
+  public void testKafkaFindMultiple() throws Exception {
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // find all messages satisfying the filter expression
+    Future<Object> future = runAsync("KAFKA_FIND(topic, m -> true, 2)");
+
+    // put 10 messages onto the topic for KAFKA_TAIL to grab
+    runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message2])"));
+
+    // all messages should satisfy the filter
+    List<String> expected = new ArrayList<String>() {{
+      add(message2);
+      add(message2);
+    }};
+    Object actual = future.get(10, TimeUnit.SECONDS);
+    assertEquals(expected, actual);
+  }
+
+  /**
+   * KAFKA_FIND should wait no more than a maximum time before returning, even 
if no matching
+   * messages are found.
+   */
+  @Test
+  public void testKafkaFindExceedsMaxWait() {
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // write all 3 messages to the topic
+    run("KAFKA_PUT(topic, [message1, message2, message3])");
+
+    // execute the test - none of the messages satisfy the filter
+    long before = System.currentTimeMillis();
+    Object actual = run("KAFKA_FIND(topic, m -> false, 10, { 
'stellar.kafka.max.wait.millis': 1000 })");
+
+    // expect not to have waited more than roughly 1000 millis
+    long wait = System.currentTimeMillis() - before;
+    assertTrue("Expected wait not to exceed max wait; actual wait = " + wait, 
wait < 2 * 1000);
+
+    // expect no messages
+    List<String> expected = Collections.emptyList();
+    assertEquals(expected, actual);
+  }
+
+  /**
    * Runs a Stellar expression.
    * @param expression The expression to run.
    */

Reply via email to