METRON-1625 Merge master into Solr feature branch (merrimanr) closes apache/metron#1067
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/2bf66503 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/2bf66503 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/2bf66503 Branch: refs/heads/feature/METRON-1554-pcap-query-panel Commit: 2bf6650327359ded34d1f96540f13051b8e5c471 Parents: a89a72c Author: merrimanr <merrim...@gmail.com> Authored: Wed Jun 20 10:03:44 2018 -0500 Committer: merrimanr <merrim...@gmail.com> Committed: Wed Jun 20 10:03:44 2018 -0500 ---------------------------------------------------------------------- .../elasticsearch/dao/ElasticsearchDao.java | 4 + .../dao/ElasticsearchMetaAlertUpdateDao.java | 32 ++-- .../dao/ElasticsearchMetaAlertDaoTest.java | 50 +++++- .../indexing/dao/metaalert/MetaScoresTest.java | 26 +++ .../metron/management/KafkaFunctions.java | 173 +++++++++++++++++-- .../KafkaFunctionsIntegrationTest.java | 165 +++++++++++++++++- 6 files changed, 418 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/2bf66503/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java index 3eb86ce..59f25f0 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java @@ -89,6 +89,10 @@ public class ElasticsearchDao implements IndexDao { return accessConfig; } + public void setAccessConfig(AccessConfig accessConfig) { + this.accessConfig = accessConfig; + } + @Override public synchronized void init(AccessConfig config) { if (this.client == null) { http://git-wip-us.apache.org/repos/asf/metron/blob/2bf66503/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java index d757dfe..bb79b7a 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java @@ -18,6 +18,7 @@ package org.apache.metron.elasticsearch.dao; +import static org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao.METAALERTS_INDEX; import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.nestedQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery; @@ -48,11 +49,14 @@ import org.apache.metron.indexing.dao.search.InvalidCreateException; import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest; import org.apache.metron.indexing.dao.update.Document; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.InnerHitBuilder; import org.elasticsearch.index.query.QueryBuilder; public class ElasticsearchMetaAlertUpdateDao extends AbstractLuceneMetaAlertUpdateDao { + private static final String INDEX_NOT_FOUND_INDICES_KEY = "es.index"; + private ElasticsearchDao elasticsearchDao; private MetaAlertRetrieveLatestDao retrieveLatestDao; private int pageSize; @@ -169,17 +173,23 @@ public class ElasticsearchMetaAlertUpdateDao extends AbstractLuceneMetaAlertUpda } else { Map<Document, Optional<String>> updates = new HashMap<>(); updates.put(update, index); - // We need to update an alert itself. Only that portion of the update can be delegated. - // We still need to get meta alerts potentially associated with it and update. - Collection<Document> metaAlerts = getMetaAlertsForAlert(update.getGuid()).getResults() - .stream() - .map(searchResult -> new Document(searchResult.getSource(), searchResult.getId(), - MetaAlertConstants.METAALERT_TYPE, 0L)) - .collect(Collectors.toList()); - // Each meta alert needs to be updated with the new alert - for (Document metaAlert : metaAlerts) { - if (replaceAlertInMetaAlert(metaAlert, update)) { - updates.put(metaAlert, Optional.of(getConfig().getMetaAlertIndex())); + try { + // We need to update an alert itself. Only that portion of the update can be delegated. + // We still need to get meta alerts potentially associated with it and update. + Collection<Document> metaAlerts = getMetaAlertsForAlert(update.getGuid()).getResults().stream() + .map(searchResult -> new Document(searchResult.getSource(), searchResult.getId(), MetaAlertConstants.METAALERT_TYPE, update.getTimestamp())) + .collect(Collectors.toList()); + // Each meta alert needs to be updated with the new alert + for (Document metaAlert : metaAlerts) { + replaceAlertInMetaAlert(metaAlert, update); + updates.put(metaAlert, Optional.of(METAALERTS_INDEX)); + } + } catch (IndexNotFoundException e) { + List<String> indicesNotFound = e.getMetadata(INDEX_NOT_FOUND_INDICES_KEY); + // If no metaalerts have been created yet and the metaalerts index does not exist, assume no metaalerts exist for alert. + // Otherwise throw the exception. + if (indicesNotFound.size() != 1 || !METAALERTS_INDEX.equals(indicesNotFound.get(0))) { + throw e; } } http://git-wip-us.apache.org/repos/asf/metron/blob/2bf66503/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java index a3a5f16..70197ea 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java @@ -18,15 +18,11 @@ package org.apache.metron.elasticsearch.dao; -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.HBaseDao; import org.apache.metron.indexing.dao.IndexDao; import org.apache.metron.indexing.dao.MultiIndexDao; +import org.apache.metron.indexing.dao.metaalert.MetaAlertConfig; import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest; import org.apache.metron.indexing.dao.search.FieldType; import org.apache.metron.indexing.dao.search.GetRequest; @@ -37,8 +33,21 @@ import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest; import org.apache.metron.indexing.dao.update.Document; +import org.elasticsearch.index.IndexNotFoundException; import org.junit.Test; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + public class ElasticsearchMetaAlertDaoTest { @@ -131,4 +140,35 @@ public class ElasticsearchMetaAlertDaoTest { createRequest.setAlerts(Collections.singletonList(new GetRequest("don't", "care"))); emaDao.createMetaAlert(createRequest); } + + @Test + public void testUpdateShouldUpdateOnMissingMetaAlertIndex() throws Exception { + ElasticsearchDao elasticsearchDao = mock(ElasticsearchDao.class); + ElasticsearchMetaAlertRetrieveLatestDao elasticsearchMetaAlertRetrieveLatestDao = mock(ElasticsearchMetaAlertRetrieveLatestDao.class); + MetaAlertConfig metaAlertConfig = mock(MetaAlertConfig.class); + ElasticsearchMetaAlertUpdateDao emauDao = spy(new ElasticsearchMetaAlertUpdateDao(elasticsearchDao, elasticsearchMetaAlertRetrieveLatestDao, metaAlertConfig, 1)); + + doThrow(new IndexNotFoundException(ElasticsearchMetaAlertDao.METAALERTS_INDEX)).when(emauDao).getMetaAlertsForAlert("alert_one"); + + Document update = new Document(new HashMap<>(), "alert_one", "", 0L); + emauDao.update(update, Optional.empty()); + + Map<Document, Optional<String>> expectedUpdate = new HashMap<Document, Optional<String>>() {{ + put(update, Optional.empty()); + }}; + verify(elasticsearchDao).batchUpdate(expectedUpdate); + } + + @Test(expected = IndexNotFoundException.class) + public void testUpdateShouldThrowExceptionOnMissingSensorIndex() throws Exception { + ElasticsearchDao elasticsearchDao = mock(ElasticsearchDao.class); + ElasticsearchMetaAlertRetrieveLatestDao elasticsearchMetaAlertRetrieveLatestDao = mock(ElasticsearchMetaAlertRetrieveLatestDao.class); + MetaAlertConfig metaAlertConfig = mock(MetaAlertConfig.class); + ElasticsearchMetaAlertUpdateDao emauDao = spy(new ElasticsearchMetaAlertUpdateDao(elasticsearchDao, elasticsearchMetaAlertRetrieveLatestDao, metaAlertConfig, 1)); + + doThrow(new IndexNotFoundException("bro")).when(emauDao).getMetaAlertsForAlert("alert_one"); + + Document update = new Document(new HashMap<>(), "alert_one", "", 0L); + emauDao.update(update, Optional.empty()); + } } http://git-wip-us.apache.org/repos/asf/metron/blob/2bf66503/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaScoresTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaScoresTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaScoresTest.java index 1359ba9..6ebfad8 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaScoresTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaScoresTest.java @@ -23,6 +23,7 @@ import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAAL import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.THREAT_FIELD_DEFAULT; import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.THREAT_SORT_DEFAULT; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.util.ArrayList; @@ -30,6 +31,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; + +import org.apache.metron.common.Constants; +import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.update.Document; import org.junit.Test; @@ -72,4 +76,26 @@ public class MetaScoresTest { // by default, the overall threat score is the sum of all child threat scores assertEquals(30.0F, threatScore); } + + @Test + public void testCalculateMetaScoresWithDifferentFieldName() { + List<Map<String, Object>> alertList = new ArrayList<>(); + + // add an alert with a threat score + alertList.add( Collections.singletonMap(MetaAlertConstants.THREAT_FIELD_DEFAULT, 10.0f)); + + // create the metaalert + Map<String, Object> docMap = new HashMap<>(); + docMap.put(MetaAlertConstants.ALERT_FIELD, alertList); + Document metaalert = new Document(docMap, "guid", MetaAlertConstants.METAALERT_TYPE, 0L); + + // Configure a different threat triage score field name + AccessConfig accessConfig = new AccessConfig(); + accessConfig.setGlobalConfigSupplier(() -> new HashMap<String, Object>() {{ + put(Constants.THREAT_SCORE_FIELD_PROPERTY, MetaAlertConstants.THREAT_FIELD_DEFAULT); + }}); + + MetaScores.calculateMetaScores(metaalert, MetaAlertConstants.THREAT_FIELD_DEFAULT, MetaAlertConstants.THREAT_SORT_DEFAULT); + assertNotNull(metaalert.getDocument().get(MetaAlertConstants.THREAT_FIELD_DEFAULT)); + } } http://git-wip-us.apache.org/repos/asf/metron/blob/2bf66503/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java index f256672..7c9c23f 100644 --- a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java +++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java @@ -18,6 +18,7 @@ package org.apache.metron.management; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.ClassUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -30,7 +31,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.metron.common.system.Clock; -import org.apache.metron.profiler.client.stellar.Util; import org.apache.metron.stellar.common.LambdaExpression; import org.apache.metron.stellar.common.utils.ConversionUtils; import org.apache.metron.stellar.common.utils.JSONUtils; @@ -66,6 +66,7 @@ import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG; * KAFKA_GET * KAFKA_PUT * KAFKA_TAIL + * KAFKA_FIND * KAFKA_PROPS */ public class KafkaFunctions { @@ -98,6 +99,30 @@ public class KafkaFunctions { public static final int DEFAULT_MAX_WAIT = 5000; /** + * The key for the global property that defines how a message is returned + * from the set of KAFKA functions. + * + * <p>simple - The result contains only the message value as a string. + * <p>rich - The result contains the message value, topic, partition, and offset. + */ + public static final String MESSAGE_VIEW_PROPERTY = "stellar.kafka.message.view"; + + /** + * An acceptable value for the 'stellar.kafka.message.view' property. The result + * provided will contain only the message value as a string. + */ + public static final String MESSAGE_VIEW_SIMPLE = "simple"; + + /** + * An acceptable value for the 'stellar.kafka.message.view' property. + * + * <p>Provides a view of each message with more detailed metadata beyond just the + * message value. The result provided will contain the message value, topic, partition, + * and offset. + */ + public static final String MESSAGE_VIEW_RICH = "rich"; + + /** * The default set of Kafka properties. */ private static Properties defaultProperties = defaultKafkaProperties(); @@ -137,6 +162,12 @@ public class KafkaFunctions { * KAFKA_GET('topic', 1, { "auto.offset.reset": "earliest" }) * } * </pre> + * + * <p>By default, only the message value is returned. By setting the global property + * 'stellar.kafka.message.view' = 'rich' the function will return additional Kafka metadata + * including the topic, partition, offset, key, and timestamp contained in a map. Setting + * this property value to 'simple' or simply not setting the property value, will result + * in the default view behavior. */ @Stellar( namespace = "KAFKA", @@ -202,7 +233,8 @@ public class KafkaFunctions { while(messages.size() < count && wait < maxWait) { for(ConsumerRecord<String, String> record: consumer.poll(pollTimeout)) { - messages.add(record.value()); + Object viewOfMessage = render(record, properties); + messages.add(viewOfMessage); } // how long have we waited? @@ -247,6 +279,12 @@ public class KafkaFunctions { * KAFKA_TAIL('topic', 10) * } * </pre> + * + * <p>By default, only the message value is returned. By setting the global property + * 'stellar.kafka.message.view' = 'rich' the function will return additional Kafka metadata + * including the topic, partition, offset, key, and timestamp contained in a map. Setting + * this property value to 'simple' or simply not setting the property value, will result + * in the default view behavior. */ @Stellar( namespace = "KAFKA", @@ -312,7 +350,8 @@ public class KafkaFunctions { while(messages.size() < count && wait < maxWait) { for(ConsumerRecord<String, String> record: consumer.poll(pollTimeout)) { - messages.add(record.value()); + Object viewOfMessage = render(record, properties); + messages.add(viewOfMessage); } // how long have we waited? @@ -357,6 +396,7 @@ public class KafkaFunctions { * KAFKA_PUT('topic', ["message1"], { "bootstrap.servers": "kafka-broker-1:6667" }) * } * </pre> + * */ @Stellar( namespace = "KAFKA", @@ -394,9 +434,49 @@ public class KafkaFunctions { // send the messages Properties properties = buildKafkaProperties(overrides, context); - putMessages(topic, messages, properties); + List<RecordMetadata> records = putMessages(topic, messages, properties); - return null; + // render a view of the messages that were written for the user + Object view = render(records, properties); + return view; + } + + /** + * Render a view of the {@link RecordMetadata} that resulted from writing + * messages to Kafka. + * + * @param records The record metadata. + * @param properties The properties. + * @return + */ + private Object render(List<RecordMetadata> records, Properties properties) { + + Object view; + if(MESSAGE_VIEW_RICH.equals(getMessageView(properties))) { + + // build a 'rich' view of the messages that were written + List<Object> responses = new ArrayList<>(); + for(RecordMetadata record: records) { + + // render the 'rich' view of the record + Map<String, Object> richView = new HashMap<>(); + richView.put("topic", record.topic()); + richView.put("partition", record.partition()); + richView.put("offset", record.offset()); + richView.put("timestamp", record.timestamp()); + + responses.add(richView); + } + + // the rich view is a list of maps containing metadata about how each message was written + view = responses; + + } else { + + // otherwise, the view is simply a count of the number of messages written + view = CollectionUtils.size(records); + } + return view; } /** @@ -407,9 +487,11 @@ public class KafkaFunctions { * @param topic The topic to send messages to. * @param messages The messages to send. * @param properties The properties to use with Kafka. + * @return Metadata about all the records written to Kafka. */ - private void putMessages(String topic, List<String> messages, Properties properties) { + private List<RecordMetadata> putMessages(String topic, List<String> messages, Properties properties) { LOG.debug("KAFKA_PUT sending messages; topic={}, count={}", topic, messages.size()); + List<RecordMetadata> records = new ArrayList<>(); try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) { List<Future<RecordMetadata>> futures = new ArrayList<>(); @@ -422,11 +504,14 @@ public class KafkaFunctions { // wait for the sends to complete for(Future<RecordMetadata> future : futures) { - waitForResponse(future, properties); + RecordMetadata record = waitForResponse(future, properties); + records.add(record); } producer.flush(); } + + return records; } /** @@ -434,19 +519,23 @@ public class KafkaFunctions { * * @param future The future for the message being sent. * @param properties The configuration properties. - * @return + * @return Metadata about the record that was written to Kafka. */ - private void waitForResponse(Future<RecordMetadata> future, Properties properties) { + private RecordMetadata waitForResponse(Future<RecordMetadata> future, Properties properties) { + RecordMetadata record = null; int maxWait = getMaxWait(properties); + try { // wait for the record and then render it for the user - RecordMetadata record = future.get(maxWait, TimeUnit.MILLISECONDS); + record = future.get(maxWait, TimeUnit.MILLISECONDS); LOG.debug("KAFKA_PUT message sent; topic={}, partition={}, offset={}", record.topic(), record.partition(), record.offset()); } catch(TimeoutException | InterruptedException | ExecutionException e) { LOG.error("KAFKA_PUT message send failure", e); } + + return record; } @Override @@ -528,6 +617,12 @@ public class KafkaFunctions { * KAFKA_FIND('topic', m -> MAP_EXISTS('geo', m), 10) * } * </pre> + * + * <p>By default, only the message value is returned. By setting the global property + * 'stellar.kafka.message.view' = 'rich' the function will return additional Kafka metadata + * including the topic, partition, offset, key, and timestamp contained in a map. Setting + * this property value to 'simple' or simply not setting the property value, will result + * in the default view behavior. */ @Stellar( namespace = "KAFKA", @@ -601,7 +696,8 @@ public class KafkaFunctions { // only keep the message if the filter expression is satisfied if(isSatisfied(filter, record.value())) { - messages.add(record.value()); + Object view = render(record, properties); + messages.add(view); // do we have enough messages already? if(messages.size() >= count) { @@ -667,6 +763,41 @@ public class KafkaFunctions { } /** + * Renders the Kafka record into a view. + * + * <p>A user can customize the way in which a Kafka record is rendered by altering + * the "stellar.kafka.message.view" property. + * + * @param record The Kafka record to render. + * @param properties The properties which allows a user to customize the rendered view of a record. + * @return + */ + private static Object render(ConsumerRecord<String, String> record, Properties properties) { + LOG.debug("Render message; topic={}, partition={}, offset={}", + record.topic(), record.partition(), record.offset()); + + Object result; + if(MESSAGE_VIEW_RICH.equals(getMessageView(properties))) { + // build the detailed view of the record + Map<String, Object> view = new HashMap<>(); + view.put("value", record.value()); + view.put("topic", record.topic()); + view.put("partition", record.partition()); + view.put("offset", record.offset()); + view.put("timestamp", record.timestamp()); + view.put("key", record.key()); + + result = view; + + } else { + // default to the simple view + result = record.value(); + } + + return result; + } + + /** * Manually assigns all partitions in a topic to a consumer * * @param topic The topic whose partitions will be assigned. @@ -756,6 +887,23 @@ public class KafkaFunctions { } /** + * Determines how Kafka messages should be rendered for the user. + * + * @param properties The properties. + * @return How the Kafka messages should be rendered. + */ + private static String getMessageView(Properties properties) { + // defaults to the simple view + String messageView = MESSAGE_VIEW_SIMPLE; + + if(properties.containsKey(MESSAGE_VIEW_PROPERTY)) { + messageView = ConversionUtils.convert(properties.get(MESSAGE_VIEW_PROPERTY), String.class); + } + + return messageView; + } + + /** * Defines a minimal set of default parameters that can be overridden * via the global properties. */ @@ -792,6 +940,9 @@ public class KafkaFunctions { // set the default poll timeout properties.put(POLL_TIMEOUT_PROPERTY, DEFAULT_POLL_TIMEOUT); + // set the default message view + properties.put(MESSAGE_VIEW_PROPERTY, MESSAGE_VIEW_SIMPLE); + return properties; } http://git-wip-us.apache.org/repos/asf/metron/blob/2bf66503/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java index d82bb37..5e045ad 100644 --- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java +++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java @@ -48,6 +48,8 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; /** @@ -153,7 +155,45 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { variables.put("topic", topicName); // put a message onto the topic - run("KAFKA_PUT(topic, [message1])"); + assertEquals(1, run("KAFKA_PUT(topic, [message1])")); + + // validate the message in the topic + assertEquals(Collections.singletonList(message1), run("KAFKA_GET(topic)")); + } + + /** + * KAFKA_PUT should be able to write multiple message to a topic. + */ + @Test + public void testKafkaPutMultipleMessages() { + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // put a message onto the topic + assertEquals(2, run("KAFKA_PUT(topic, [message1, message2])")); + + // validate the message in the topic + List<String> expected = new ArrayList<String>() {{ + add(message1); + add(message2); + }}; + assertEquals(expected, run("KAFKA_GET(topic, 2)")); + } + + /** + * KAFKA_PUT should be able to write a message passed as a String, rather than a List. + */ + @Test + public void testKafkaPutOneMessagePassedAsString() { + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // put a message onto the topic - the message is just a string, not a list + run("KAFKA_PUT(topic, message1)"); // get a message from the topic Object actual = run("KAFKA_GET(topic)"); @@ -166,7 +206,40 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { * KAFKA_PUT should be able to write a message passed as a String, rather than a List. */ @Test - public void testKafkaPutOneMessagePassedAsString() { + public void testKafkaPutWithRichView() { + + // configure a detailed view of each message + global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH); + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // put a message onto the topic - the message is just a string, not a list + Object actual = run("KAFKA_PUT(topic, message1)"); + + // validate + assertTrue(actual instanceof List); + List<Object> results = (List) actual; + assertEquals(1, results.size()); + + // expect a 'rich' view of the record + Map<String, Object> view = (Map) results.get(0); + assertEquals(topicName, view.get("topic")); + assertEquals(0, view.get("partition")); + assertEquals(0L, view.get("offset")); + assertNotNull(view.get("timestamp")); + + } + + /** + * KAFKA_GET should allow a user to see a detailed view of each Kafka record. + */ + @Test + public void testKafkaGetWithRichView() { + + // configure a detailed view of each message + global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH); // use a unique topic name for this test final String topicName = testName.getMethodName(); @@ -179,7 +252,17 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { Object actual = run("KAFKA_GET(topic)"); // validate - assertEquals(Collections.singletonList(message1), actual); + assertTrue(actual instanceof List); + List<Object> results = (List) actual; + assertEquals(1, results.size()); + + // expect a 'rich' view of the record + Map<String, Object> view = (Map) results.get(0); + assertNull(view.get("key")); + assertEquals(0L, view.get("offset")); + assertEquals(0, view.get("partition")); + assertEquals(topicName, view.get("topic")); + assertEquals(message1, view.get("value")); } /** @@ -300,6 +383,45 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { } /** + * KAFKA_TAIL should allow a user to see a rich view of each Kafka record. + */ + @Test + public void testKafkaTailWithRichView() throws Exception { + + // configure a detailed view of each message + global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH); + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // put multiple messages onto the topic; KAFKA tail should NOT retrieve these + run("KAFKA_PUT(topic, [message2, message2, message2])"); + + // get a message from the topic; will block until messages arrive + Future<Object> tailFuture = runAsync("KAFKA_TAIL(topic, 1)"); + + // put 10 messages onto the topic for KAFKA_TAIL to grab + runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message1])")); + + // wait for KAFKA_TAIL to complete + Object actual = tailFuture.get(10, TimeUnit.SECONDS); + + // validate + assertTrue(actual instanceof List); + List<Object> results = (List) actual; + assertEquals(1, results.size()); + + // expect a 'rich' view of the record + Map<String, Object> view = (Map) results.get(0); + assertNull(view.get("key")); + assertEquals(0, view.get("partition")); + assertEquals(topicName, view.get("topic")); + assertEquals(message1, view.get("value")); + assertNotNull(view.get("offset")); + } + + /** * KAFKA_PROPS should return the set of properties used to configure the Kafka consumer * * The properties used for the KAFKA_* functions are calculated by compiling the default, global and user @@ -339,7 +461,7 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { Map<String, String> properties = (Map<String, String>) run(expression); assertEquals(expected, properties.get(overriddenKey)); } - + /** * KAFKA_FIND should only return messages that satisfy a filter expression. */ @@ -385,6 +507,40 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { } /** + * KAFKA_FIND should allow a user to see a detailed view of each Kafka record. + */ + @Test + public void testKafkaFindWithRichView() throws Exception { + + // configure a detailed view of each message + global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH); + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // find all messages satisfying the filter expression + Future<Object> future = runAsync("KAFKA_FIND(topic, m -> MAP_GET('value', m) == 23)"); + + // put 10 messages onto the topic for KAFKA_TAIL to grab + runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message2])")); + + // validate + Object actual = future.get(10, TimeUnit.SECONDS); + assertTrue(actual instanceof List); + List<Object> results = (List) actual; + assertEquals(1, results.size()); + + // expect a 'rich' view of the record + Map<String, Object> view = (Map) results.get(0); + assertNull(view.get("key")); + assertNotNull(view.get("offset")); + assertEquals(0, view.get("partition")); + assertEquals(topicName, view.get("topic")); + assertEquals(message2, view.get("value")); + } + + /** * KAFKA_FIND should return no more messages than its limit. */ @Test @@ -491,4 +647,3 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { } } } -