http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-enrichment/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/README.md b/metron-platform/metron-enrichment/README.md index 8a53e71..c72970f 100644 --- a/metron-platform/metron-enrichment/README.md +++ b/metron-platform/metron-enrichment/README.md @@ -31,36 +31,22 @@ data format (e.g. a JSON Map structure with `original_message` and ## Enrichment Architecture -![Architecture](enrichment_arch.png) +![Unified Architecture](unified_enrichment_arch.svg) ### Unified Enrichment Topology -There is an experimental unified enrichment topology which is shipped. -Currently the architecture, as described above, has a split/join in -order to perform enrichments in parallel. This poses some issues in -terms of ease of tuning and reasoning about performance. - -In order to deal with these issues, there is an alternative enrichment topology which -uses data parallelism as opposed to the split/join task parallelism. -This architecture uses a worker pool to fully enrich any message within -a worker. This results in +The unified enrichment topology uses data parallelism as opposed to the deprecated +split/join topology's task parallelism. This architecture uses a worker pool to fully +enrich any message within a worker. This results in * Fewer bolts in the topology * Each bolt fully operates on a message. * Fewer network hops -![Unified Architecture](unified_enrichment_arch.svg) - -This architecture is fully backwards compatible; the only difference is -how the enrichment will operate on each message (in one bolt where the -split/join is done in a threadpool as opposed +This architecture is fully backwards compatible with the old split-join +topology; the only difference is how the enrichment will operate on each +message (in one bolt where the split/join is done in a threadpool as opposed to split across multiple bolts). -#### Using It - -In order to use this, you will need to -* Edit `$METRON_HOME/bin/start_enrichment_topology.sh` and adjust it to use `remote-unified.yaml` instead of `remote.yaml` -* Restart the enrichment topology. - #### Configuring It There are two parameters which you might want to tune in this topology. @@ -76,6 +62,19 @@ intel bolt, the configurations will be taken from the respective join bolt parallelism. When proper ambari support for this is added, we will add its own property. +### Split-Join Enrichment Topology + +The now-deprecated split/join topology is also available and performs enrichments in parallel. +This poses some issues in terms of ease of tuning and reasoning about performance. + +![Architecture](enrichment_arch.png) + +#### Using It + +In order to use the older, deprecated topology, you will need to +* Edit `$METRON_HOME/bin/start_enrichment_topology.sh` and adjust it to use `remote-splitjoin.yaml` instead of `remote-unified.yaml` +* Restart the enrichment topology. + ## Enrichment Configuration The configuration for the `enrichment` topology, the topology primarily @@ -85,7 +84,6 @@ defined by JSON documents stored in zookeeper. There are two types of configurations at the moment, `global` and `sensor` specific. - ## Global Configuration There are a few enrichments which have independent configurations, such @@ -134,7 +132,6 @@ The configuration is a complex JSON object with the following top level fields: ### The `enrichment` Configuration - | Field | Description | Example | |------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------| | `fieldToTypeMap` | In the case of a simple HBase enrichment (i.e. a key/value lookup), the mapping between fields and the enrichment types associated with those fields must be known. This enrichment type is used as part of the HBase key. Note: applies to hbaseEnrichment only. | `"fieldToTypeMap" : { "ip_src_addr" : [ "asset_enrichment" ] }` |
http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java index 0677453..e0f1f0c 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java @@ -271,6 +271,7 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt { @Override public void cleanup() { + super.cleanup(); adapter.cleanup(); } http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java index cfa101d..fcfa918 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java @@ -17,16 +17,12 @@ */ package org.apache.metron.enrichment.bolt; -import com.google.common.base.Joiner; import java.lang.invoke.MethodHandles; import java.util.HashMap; import java.util.Map; import org.apache.metron.common.configuration.ConfigurationType; import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler; -import org.apache.metron.common.configuration.enrichment.threatintel.RuleScore; -import org.apache.metron.common.configuration.enrichment.threatintel.ThreatScore; -import org.apache.metron.common.configuration.enrichment.threatintel.ThreatTriageConfig; import org.apache.metron.common.message.MessageGetStrategy; import org.apache.metron.common.utils.MessageUtils; import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase; @@ -34,8 +30,6 @@ import org.apache.metron.enrichment.utils.ThreatIntelUtils; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.StellarFunctions; import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver; -import org.apache.metron.stellar.common.utils.ConversionUtils; -import org.apache.metron.threatintel.triage.ThreatTriageProcessor; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.json.simple.JSONObject; http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh b/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh index 77c3a77..d3ed8ad 100755 --- a/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh +++ b/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh @@ -20,11 +20,11 @@ METRON_VERSION=${project.version} METRON_HOME=/usr/metron/$METRON_VERSION TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION-uber.jar -# there are two enrichment topologies. by default, the split-join enrichment topology is executed +# There are two enrichment topologies. By default, the unified enrichment topology is executed. Split-join is now deprecated. SPLIT_JOIN_ARGS="--remote $METRON_HOME/flux/enrichment/remote-splitjoin.yaml --filter $METRON_HOME/config/enrichment-splitjoin.properties" UNIFIED_ARGS="--remote $METRON_HOME/flux/enrichment/remote-unified.yaml --filter $METRON_HOME/config/enrichment-unified.properties" # by passing in different args, the user can execute an alternative enrichment topology -ARGS=${@:-$SPLIT_JOIN_ARGS} +ARGS=${@:-$UNIFIED_ARGS} storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux $ARGS http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java index 588fc58..083628c 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java @@ -207,7 +207,7 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { } UnitTestHelper.setLog4jLevel(BulkWriterComponent.class, Level.ERROR); verify(outputCollector, times(5)).ack(tuple); - verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any(Values.class)); + verify(outputCollector, times(5)).emit(eq(Constants.ERROR_STREAM), any(Values.class)); verify(outputCollector, times(1)).reportError(any(Throwable.class)); } http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java index 2e1968a..cfe5752 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java @@ -46,11 +46,11 @@ import org.junit.rules.ExpectedException; public abstract class SearchIntegrationTest { /** * [ - * {"source:type": "bro", "ip_src_addr":"192.168.1.1", "ip_src_port": 8010, "long_field": 10000, "timestamp":1, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 1", "ttl": "data 1", "guid":"bro_1"}, - * {"source:type": "bro", "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, "long_field": 20000, "timestamp":2, "latitude": 48.0001, "score": 50.0, "is_alert":false, "location_point": "48.5839,7.7455", "bro_field": "bro data 2", "ttl": "data 2", "guid":"bro_2"}, - * {"source:type": "bro", "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, "long_field": 10000, "timestamp":3, "latitude": 48.5839, "score": 20.0, "is_alert":true, "location_point": "50.0,7.7455", "bro_field": "bro data 3", "ttl": "data 3", "guid":"bro_3"}, - * {"source:type": "bro", "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, "long_field": 10000, "timestamp":4, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 4", "ttl": "data 4", "guid":"bro_4"}, - * {"source:type": "bro", "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, "long_field": 10000, "timestamp":5, "latitude": 48.5839, "score": 98.0, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 5", "ttl": "data 5", "guid":"bro_5"} + * {"source:type": "bro", "ip_src_addr":"192.168.1.1", "ip_src_port": 8010, "long_field": 10000, "timestamp":1, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "method": "bro data 1", "ttl": "data 1", "guid":"bro_1"}, + * {"source:type": "bro", "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, "long_field": 20000, "timestamp":2, "latitude": 48.0001, "score": 50.0, "is_alert":false, "location_point": "48.5839,7.7455", "method": "bro data 2", "ttl": "data 2", "guid":"bro_2"}, + * {"source:type": "bro", "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, "long_field": 10000, "timestamp":3, "latitude": 48.5839, "score": 20.0, "is_alert":true, "location_point": "50.0,7.7455", "method": "bro data 3", "ttl": "data 3", "guid":"bro_3"}, + * {"source:type": "bro", "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, "long_field": 10000, "timestamp":4, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "method": "bro data 4", "ttl": "data 4", "guid":"bro_4"}, + * {"source:type": "bro", "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, "long_field": 10000, "timestamp":5, "latitude": 48.5839, "score": 98.0, "is_alert":true, "location_point": "48.5839,7.7455", "method": "bro data 5", "ttl": "data 5", "guid":"bro_5"} * ] */ @Multiline @@ -58,11 +58,11 @@ public abstract class SearchIntegrationTest { /** * [ - * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "score": 50.0, "is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, "ttl": 1, "guid":"snort_1", "threat:triage:score":10.0}, - * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, "ttl": 2, "guid":"snort_2", "threat:triage:score":20.0}, - * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "score": 20.0, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 30, "ttl": 3, "guid":"snort_3"}, - * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "score": 50.0, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 40, "ttl": 4, "guid":"snort_4"}, - * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "score": 10.0, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 50, "ttl": 5, "guid":"snort_5"} + * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "score": 50.0, "is_alert":false, "location_point": "50.0,7.7455", "sig_generator": "sig_generator 1", "ttl": 1, "guid":"snort_1", "threat:triage:score":10.0}, + * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "sig_generator": "sig_generator 2", "ttl": 2, "guid":"snort_2", "threat:triage:score":20.0}, + * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "score": 20.0, "is_alert":false, "location_point": "48.5839,7.7455", "sig_generator": "sig_generator 3", "ttl": 3, "guid":"snort_3"}, + * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "score": 50.0, "is_alert":true, "location_point": "48.5839,7.7455", "sig_generator": "sig_generator 4", "ttl": 4, "guid":"snort_4"}, + * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "score": 10.0, "is_alert":false, "location_point": "48.5839,7.7455", "sig_generator": "sig_generator 5", "ttl": 5, "guid":"snort_5"} * ] */ @Multiline @@ -272,7 +272,7 @@ public abstract class SearchIntegrationTest { /** * { - * "facetFields": ["snort_field"], + * "facetFields": ["sig_generator"], * "indices": ["bro", "snort"], * "query": "*:*", * "from": 0, @@ -698,14 +698,14 @@ public abstract class SearchIntegrationTest { Map<String, Map<String, Long>> facetCounts = response.getFacetCounts(); Assert.assertEquals(1, facetCounts.size()); - Map<String, Long> snortFieldCounts = facetCounts.get("snort_field"); + Map<String, Long> snortFieldCounts = facetCounts.get("sig_generator"); Assert.assertEquals(5, snortFieldCounts.size()); - Assert.assertEquals(1L, snortFieldCounts.get("50").longValue()); - Assert.assertEquals(1L, snortFieldCounts.get("40").longValue()); - Assert.assertEquals(1L, snortFieldCounts.get("30").longValue()); - Assert.assertEquals(1L, snortFieldCounts.get("20").longValue()); - Assert.assertEquals(1L, snortFieldCounts.get("10").longValue()); + Assert.assertEquals(1L, snortFieldCounts.get("sig_generator 5").longValue()); + Assert.assertEquals(1L, snortFieldCounts.get("sig_generator 4").longValue()); + Assert.assertEquals(1L, snortFieldCounts.get("sig_generator 3").longValue()); + Assert.assertEquals(1L, snortFieldCounts.get("sig_generator 2").longValue()); + Assert.assertEquals(1L, snortFieldCounts.get("sig_generator 1").longValue()); response.getFacetCounts(); } http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-integration-test/src/main/sample/patterns/test ---------------------------------------------------------------------- diff --git a/metron-platform/metron-integration-test/src/main/sample/patterns/test b/metron-platform/metron-integration-test/src/main/sample/patterns/test index a88a255..ebbf9c4 100644 --- a/metron-platform/metron-integration-test/src/main/sample/patterns/test +++ b/metron-platform/metron-integration-test/src/main/sample/patterns/test @@ -1,2 +1,3 @@ YAF_TIME_FORMAT %{YEAR:UNWANTED}-%{MONTHNUM:UNWANTED}-%{MONTHDAY:UNWANTED}[T ]%{HOUR:UNWANTED}:%{MINUTE:UNWANTED}:%{SECOND:UNWANTED} YAF_DELIMITED %{NUMBER:start_time}\|%{YAF_TIME_FORMAT:end_time}\|%{SPACE:UNWANTED}%{BASE10NUM:duration}\|%{SPACE:UNWANTED}%{BASE10NUM:rtt}\|%{SPACE:UNWANTED}%{INT:protocol}\|%{SPACE:UNWANTED}%{IP:ip_src_addr}\|%{SPACE:UNWANTED}%{INT:ip_src_port}\|%{SPACE:UNWANTED}%{IP:ip_dst_addr}\|%{SPACE:UNWANTED}%{INT:ip_dst_port}\|%{SPACE:UNWANTED}%{DATA:iflags}\|%{SPACE:UNWANTED}%{DATA:uflags}\|%{SPACE:UNWANTED}%{DATA:riflags}\|%{SPACE:UNWANTED}%{DATA:ruflags}\|%{SPACE:UNWANTED}%{WORD:isn}\|%{SPACE:UNWANTED}%{DATA:risn}\|%{SPACE:UNWANTED}%{DATA:tag}\|%{GREEDYDATA:rtag}\|%{SPACE:UNWANTED}%{INT:pkt}\|%{SPACE:UNWANTED}%{INT:oct}\|%{SPACE:UNWANTED}%{INT:rpkt}\|%{SPACE:UNWANTED}%{INT:roct}\|%{SPACE:UNWANTED}%{INT:app}\|%{GREEDYDATA:end_reason} +ELBACCESSLOGS %{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:elb} %{IP:clientip}:%{INT:clientport} (?:(%{IP:backendip}:?:%{INT:backendport})|-) %{NUMBER:request_processing_time} %{NUMBER:backend_processing_time} %{NUMBER:response_processing_time} (?:-|%{INT:elb_status_code}) (?:-|%{INT:backend_status_code}) %{INT:received_bytes} %{INT:sent_bytes} \"(?:-|(?:%{WORD:verb} %{URIPROTO:proto}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST:urihost})?(?:%{URIPATH:path}(?:%{URIPARAM:params})?)?(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest}))\" \"(?:-|%{DATA:user_agent})\" (?:-|%{NOTSPACE:ssl_cipher}) (?:-|%{NOTSPACE:ssl_protocol}) http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md index 381b0ee..cfcf6ed 100644 --- a/metron-platform/metron-parsers/README.md +++ b/metron-platform/metron-parsers/README.md @@ -29,10 +29,12 @@ There are two general types types of parsers: * Grok parser: `org.apache.metron.parsers.GrokParser` with possible `parserConfig` entries of * `grokPath` : The path in HDFS (or in the Jar) to the grok statement * `patternLabel` : The pattern label to use from the grok statement + * `multiLine` : The raw data passed in should be handled as a long with multiple lines, with each line to be parsed separately. This setting's valid values are 'true' or 'false'. The default if unset is 'false'. When set the parser will handle multiple lines with successfully processed lines emitted normally, and lines with errors sent to the error topic. * `timestampField` : The field to use for timestamp * `timeFields` : A list of fields to be treated as time * `dateFormat` : The date format to use to parse the time fields * `timezone` : The timezone to use. `UTC` is default. + * The Grok parser supports either 1 line to parse per incoming message, or incoming messages with multiple log lines, and will produce a json message per line * CSV Parser: `org.apache.metron.parsers.csv.CSVParser` with possible `parserConfig` entries of * `timestampFormat` : The date format of the timestamp to use. If unspecified, the parser assumes the timestamp is ms since unix epoch. * `columns` : A map of column names you wish to extract from the CSV to their offsets (e.g. `{ 'name' : 1, 'profession' : 3}` would be a column map for extracting the 2nd and 4th columns from a CSV) @@ -513,9 +515,13 @@ Java parser adapters are intended for higher-velocity topologies and are not eas * org.apache.metron.parsers.syslog.Syslog5424Parser : Parse Syslog RFC 5424 messages ### Grok Parser Adapters -Grok parser adapters are designed primarly for someone who is not a Java coder for quickly standing up a parser adapter for lower velocity topologies. Grok relies on Regex for message parsing, which is much slower than purpose-built Java parsers, but is more extensible. Grok parsers are defined via a config file and the topplogy does not need to be recombiled in order to make changes to them. An example of a Grok perser is: +Grok parser adapters are designed primarily for someone who is not a Java coder for quickly standing up a parser adapter for lower velocity topologies. Grok relies on Regex for message parsing, which is much slower than purpose-built Java parsers, but is more extensible. Grok parsers are defined via a config file and the topplogy does not need to be recompiled in order to make changes to them. Example of a Grok parsers are: -* org.apache.metron.parsers.GrokParser +* org.apache.metron.parsers.GrokParser and org.apache.metron.parsers.websphere.GrokWebSphereParser + +Parsers that derive from GrokParser typically allow the GrokParser to parse the messages, and then override the methods for postParse to do further parsing. +When this is the case, and the Parser has not overridden `parse(byte[])` or `parseResultOptional(byte[])` these parsers will gain support for treating byte[] input as multiple lines, with each line parsed as a separate message ( and returned as such). +This is enabled by using the `"multiline":"true"` Parser configuration option. For more information on the Grok project please refer to the following link: http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/message-parser-implementation-notes.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/message-parser-implementation-notes.md b/metron-platform/metron-parsers/message-parser-implementation-notes.md new file mode 100644 index 0000000..b8afe04 --- /dev/null +++ b/metron-platform/metron-parsers/message-parser-implementation-notes.md @@ -0,0 +1,57 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +# `MessageParser` implementation notes + + +1. Supporting multiple JSONObject returns from a single byte[] +The original `MessageParser` interface supported parsing a message and returning a `List<JSONObject>`. Therefore explicitly supporting multiple messages from one input. +While this is fine, it only allows for the complete failure of a message for any reason. There can only be one exception thrown. This means that if there _are_ multiple messages in the buffer, any one failure will necessarily fail all of them. +To improve on this situation, a new method was added to the `MessageParser` interface (with a default implementation), that introduces a return type to provide not only the JSONObjects produced, but also a `Map` of messages -> throwable. + +To support this in your parser, you should: + +- Implement the new method + +```java + @Override + public Optional<MessageParserResult<JSONObject>> parseOptionalResult(byte[] rawMessage) +``` + +- Implement the original `List<JSONObject> parse(byte[] message)` to delegate to that method such as below: + +```java + @Override + public List<JSONObject> parse(byte[] rawMessage) { + Optional<MessageParserResult<JSONObject>> resultOptional = parseOptionalResult(rawMessage); + if (!resultOptional.isPresent()) { + return Collections.EMPTY_LIST; + } + Map<Object,Throwable> errors = resultOptional.get().getMessageThrowables(); + if (!errors.isEmpty()) { + throw new RuntimeException(errors.entrySet().iterator().next().getValue()); + } + + return resultOptional.get().getMessages(); + } +``` + +- You *may* want to govern treating the incoming buffer as multiline or not by adding a configuration option for your parser, such as `"multiline":"true"|"false"` + +- See the org.apache.metron.parsers.GrokParser for an example of this implementation. + +The Metron system itself will call the new `parseOptionalResult` method during processing. The default implementation in the interface handles backwards compatability with previous implementations. http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultMessageParserResult.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultMessageParserResult.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultMessageParserResult.java new file mode 100644 index 0000000..11d15eb --- /dev/null +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultMessageParserResult.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.parsers; + +import org.apache.metron.parsers.interfaces.MessageParserResult; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class DefaultMessageParserResult<T> implements MessageParserResult<T> { + private List<T> messages = new ArrayList<>(); + private Map<Object, Throwable> errors = new HashMap<>(); + private Throwable masterThrowable; + + public DefaultMessageParserResult() { + } + + public DefaultMessageParserResult(Throwable masterThrowable) { + this.masterThrowable = masterThrowable; + } + + public DefaultMessageParserResult(List<T> list) { + messages.addAll(list); + } + + public DefaultMessageParserResult(Map<Object, Throwable> map) { + errors.putAll(map); + } + + public DefaultMessageParserResult(List<T> list, Map<Object, Throwable> map) { + messages.addAll(list); + errors.putAll(map); + } + + public void addMessage(T message) { + messages.add(message); + } + + public void addError(Object message, Throwable throwable) { + errors.put(message, throwable); + } + + @Override + public List<T> getMessages() { + return messages; + } + + @Override + public Map<Object, Throwable> getMessageThrowables() { + return errors; + } + + @Override + public Optional<Throwable> getMasterThrowable() { + return Optional.ofNullable(masterThrowable); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java new file mode 100644 index 0000000..79a9b5d --- /dev/null +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.parsers; + +import org.apache.metron.common.error.MetronError; +import org.json.simple.JSONObject; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Default implementation of ParserRunnerResults. + */ +public class DefaultParserRunnerResults implements ParserRunnerResults<JSONObject> { + + private List<JSONObject> messages = new ArrayList<>(); + private List<MetronError> errors = new ArrayList<>(); + + public List<JSONObject> getMessages() { + return messages; + } + + public List<MetronError> getErrors() { + return errors; + } + + public void addMessage(JSONObject message) { + this.messages.add(message); + } + + public void addError(MetronError error) { + this.errors.add(error); + } + + public void addErrors(List<MetronError> errors) { + this.errors.addAll(errors); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ParserRunnerResults parserResult = (ParserRunnerResults) o; + return Objects.equals(messages, parserResult.getMessages()) && + Objects.equals(errors, parserResult.getErrors()); + } + + @Override + public int hashCode() { + int result = messages != null ? messages.hashCode() : 0; + result = 31 * result + (errors != null ? errors.hashCode() : 0); + return result; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java index 99ac390..6bdfb81 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,32 +15,43 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.metron.parsers; import com.google.common.base.Joiner; import com.google.common.base.Splitter; +import oi.thekraken.grok.api.Grok; +import oi.thekraken.grok.api.Match; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.metron.common.Constants; +import org.apache.metron.parsers.interfaces.MessageParser; +import org.apache.metron.parsers.interfaces.MessageParserResult; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Serializable; +import java.io.StringReader; import java.lang.invoke.MethodHandles; +import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.TimeZone; -import oi.thekraken.grok.api.Grok; -import oi.thekraken.grok.api.Match; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.metron.common.Constants; -import org.apache.metron.parsers.interfaces.MessageParser; -import org.json.simple.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + public class GrokParser implements MessageParser<JSONObject>, Serializable { @@ -48,6 +59,7 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable { protected transient Grok grok; protected String grokPath; + protected boolean multiLine = false; protected String patternLabel; protected List<String> timeFields = new ArrayList<>(); protected String timestampField; @@ -55,8 +67,13 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable { protected String patternsCommonDir = "/patterns/common"; @Override + @SuppressWarnings("unchecked") public void configure(Map<String, Object> parserConfig) { this.grokPath = (String) parserConfig.get("grokPath"); + String multiLineString = (String) parserConfig.get("multiLine"); + if (!StringUtils.isBlank(multiLineString)) { + multiLine = Boolean.parseBoolean(multiLineString); + } this.patternLabel = (String) parserConfig.get("patternLabel"); this.timestampField = (String) parserConfig.get("timestampField"); List<String> timeFieldsParam = (List<String>) parserConfig.get("timeFields"); @@ -126,11 +143,73 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable { @SuppressWarnings("unchecked") @Override - public List<JSONObject> parse(byte[] rawMessage) { + public Optional<MessageParserResult<JSONObject>> parseOptionalResult(byte[] rawMessage) { if (grok == null) { init(); } + if (multiLine) { + return parseMultiLine(rawMessage); + } + return parseSingleLine(rawMessage); + } + + @SuppressWarnings("unchecked") + private Optional<MessageParserResult<JSONObject>> parseMultiLine(byte[] rawMessage) { List<JSONObject> messages = new ArrayList<>(); + Map<Object,Throwable> errors = new HashMap<>(); + String originalMessage = null; + // read the incoming raw data as if it may have multiple lines of logs + // if there is only only one line, it will just get processed. + try (BufferedReader reader = new BufferedReader(new StringReader(new String(rawMessage, StandardCharsets.UTF_8)))) { + while ((originalMessage = reader.readLine()) != null) { + LOG.debug("Grok parser parsing message: {}", originalMessage); + try { + Match gm = grok.match(originalMessage); + gm.captures(); + JSONObject message = new JSONObject(); + message.putAll(gm.toMap()); + + if (message.size() == 0) { + Throwable rte = new RuntimeException("Grok statement produced a null message. Original message was: " + + originalMessage + " and the parsed message was: " + message + " . Check the pattern at: " + + grokPath); + errors.put(originalMessage, rte); + continue; + } + message.put("original_string", originalMessage); + for (String timeField : timeFields) { + String fieldValue = (String) message.get(timeField); + if (fieldValue != null) { + message.put(timeField, toEpoch(fieldValue)); + } + } + if (timestampField != null) { + message.put(Constants.Fields.TIMESTAMP.getName(), formatTimestamp(message.get(timestampField))); + } + message.remove(patternLabel); + postParse(message); + messages.add(message); + LOG.debug("Grok parser parsed message: {}", message); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + errors.put(originalMessage, e); + } + } + } catch (IOException e) { + LOG.error(e.getMessage(), e); + Exception innerException = new IllegalStateException("Grok parser Error: " + + e.getMessage() + + " on " + + originalMessage, e); + return Optional.of(new DefaultMessageParserResult<>(innerException)); + } + return Optional.of(new DefaultMessageParserResult<>(messages, errors)); + } + + @SuppressWarnings("unchecked") + private Optional<MessageParserResult<JSONObject>> parseSingleLine(byte[] rawMessage) { + List<JSONObject> messages = new ArrayList<>(); + Map<Object,Throwable> errors = new HashMap<>(); String originalMessage = null; try { originalMessage = new String(rawMessage, "UTF-8"); @@ -140,30 +219,36 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable { JSONObject message = new JSONObject(); message.putAll(gm.toMap()); - if (message.size() == 0) - throw new RuntimeException("Grok statement produced a null message. Original message was: " + if (message.size() == 0) { + Throwable rte = new RuntimeException("Grok statement produced a null message. Original message was: " + originalMessage + " and the parsed message was: " + message + " . Check the pattern at: " + grokPath); - - message.put("original_string", originalMessage); - for (String timeField : timeFields) { - String fieldValue = (String) message.get(timeField); - if (fieldValue != null) { - message.put(timeField, toEpoch(fieldValue)); + errors.put(originalMessage, rte); + } else { + message.put("original_string", originalMessage); + for (String timeField : timeFields) { + String fieldValue = (String) message.get(timeField); + if (fieldValue != null) { + message.put(timeField, toEpoch(fieldValue)); + } } + if (timestampField != null) { + message.put(Constants.Fields.TIMESTAMP.getName(), formatTimestamp(message.get(timestampField))); + } + message.remove(patternLabel); + postParse(message); + messages.add(message); + LOG.debug("Grok parser parsed message: {}", message); } - if (timestampField != null) { - message.put(Constants.Fields.TIMESTAMP.getName(), formatTimestamp(message.get(timestampField))); - } - message.remove(patternLabel); - postParse(message); - messages.add(message); - LOG.debug("Grok parser parsed message: {}", message); } catch (Exception e) { LOG.error(e.getMessage(), e); - throw new IllegalStateException("Grok parser Error: " + e.getMessage() + " on " + originalMessage , e); + Exception innerException = new IllegalStateException("Grok parser Error: " + + e.getMessage() + + " on " + + originalMessage, e); + return Optional.of(new DefaultMessageParserResult<>(innerException)); } - return messages; + return Optional.of(new DefaultMessageParserResult<JSONObject>(messages, errors)); } @Override http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java new file mode 100644 index 0000000..f9123b1 --- /dev/null +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.parsers; + +import org.apache.metron.common.configuration.ParserConfigurations; +import org.apache.metron.common.message.metadata.RawMessage; +import org.apache.metron.parsers.interfaces.MessageParserResult; +import org.apache.metron.stellar.dsl.Context; + +import java.util.List; +import java.util.Set; +import java.util.function.Supplier; + +/** + * A ParserRunner is responsible for initializing MessageParsers and parsing messages with the appropriate MessageParser. + * The information needed to initialize a MessageParser is supplied by the parser config supplier. After the parsers + * are initialized, the execute method can then be called for each message and will return a ParserRunnerResults object + * that contains a list of parsed messages and/or a list of errors. + * @param <T> The type of a successfully parsed message. + */ +public interface ParserRunner<T> { + + /** + * Return a list of all sensor types that can be parsed with this ParserRunner. + * @return Sensor types + */ + Set<String> getSensorTypes(); + + /** + * + * @param parserConfigSupplier Supplies parser configurations + * @param stellarContext Stellar context used to apply Stellar functions during field transformations + */ + void init(Supplier<ParserConfigurations> parserConfigSupplier, Context stellarContext); + + /** + * Parses a message and either returns the message or an error. + * @param sensorType Sensor type of the message + * @param rawMessage Raw message including metadata + * @param parserConfigurations Parser configurations + * @return ParserRunnerResults containing a list of messages and a list of errors + */ + ParserRunnerResults<T> execute(String sensorType, RawMessage rawMessage, ParserConfigurations parserConfigurations); + +} http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java new file mode 100644 index 0000000..a986db7 --- /dev/null +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.parsers; + +import org.apache.commons.lang3.StringUtils; +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.FieldTransformer; +import org.apache.metron.common.configuration.FieldValidator; +import org.apache.metron.common.configuration.ParserConfigurations; +import org.apache.metron.common.configuration.SensorParserConfig; +import org.apache.metron.common.error.MetronError; +import org.apache.metron.common.message.metadata.RawMessage; +import org.apache.metron.common.utils.ReflectionUtils; +import org.apache.metron.parsers.filters.Filters; +import org.apache.metron.parsers.interfaces.MessageFilter; +import org.apache.metron.parsers.interfaces.MessageParser; +import org.apache.metron.parsers.interfaces.MessageParserResult; +import org.apache.metron.parsers.topology.ParserComponent; +import org.apache.metron.stellar.dsl.Context; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * The default implemention of a ParserRunner. + */ +public class ParserRunnerImpl implements ParserRunner<JSONObject>, Serializable { + + class ProcessResult { + + private JSONObject message; + private MetronError error; + + public ProcessResult(JSONObject message) { + this.message = message; + } + + public ProcessResult(MetronError error) { + this.error = error; + } + + public JSONObject getMessage() { + return message; + } + + public MetronError getError() { + return error; + } + + public boolean isError() { + return error != null; + } + } + + protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + protected transient Consumer<ParserRunnerResults> onSuccess; + protected transient Consumer<MetronError> onError; + + private HashSet<String> sensorTypes; + private Map<String, ParserComponent> sensorToParserComponentMap; + + // Stellar variables + private transient Context stellarContext; + + public ParserRunnerImpl(HashSet<String> sensorTypes) { + this.sensorTypes = sensorTypes; + } + + public Map<String, ParserComponent> getSensorToParserComponentMap() { + return sensorToParserComponentMap; + } + + public void setSensorToParserComponentMap(Map<String, ParserComponent> sensorToParserComponentMap) { + this.sensorToParserComponentMap = sensorToParserComponentMap; + } + + public Context getStellarContext() { + return stellarContext; + } + + @Override + public Set<String> getSensorTypes() { + return sensorTypes; + } + + @Override + public void init(Supplier<ParserConfigurations> parserConfigSupplier, Context stellarContext) { + if (parserConfigSupplier == null) { + throw new IllegalStateException("A parser config supplier must be set before initializing the ParserRunner."); + } + if (stellarContext == null) { + throw new IllegalStateException("A stellar context must be set before initializing the ParserRunner."); + } + this.stellarContext = stellarContext; + initializeParsers(parserConfigSupplier); + } + + /** + * Parses messages with the appropriate MessageParser based on sensor type. The resulting list of messages are then + * post-processed and added to the ParserRunnerResults message list. Any errors that happen during post-processing are + * added to the ParserRunnerResults error list. Any exceptions (including a master exception) thrown by the MessageParser + * are also added to the ParserRunnerResults error list. + * + * @param sensorType Sensor type of the message + * @param rawMessage Raw message including metadata + * @param parserConfigurations Parser configurations + * @return ParserRunnerResults containing a list of messages and a list of errors + */ + @Override + public ParserRunnerResults<JSONObject> execute(String sensorType, RawMessage rawMessage, ParserConfigurations parserConfigurations) { + DefaultParserRunnerResults parserRunnerResults = new DefaultParserRunnerResults(); + SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType); + if (sensorParserConfig != null) { + MessageParser<JSONObject> parser = sensorToParserComponentMap.get(sensorType).getMessageParser(); + Optional<MessageParserResult<JSONObject>> optionalMessageParserResult = parser.parseOptionalResult(rawMessage.getMessage()); + if (optionalMessageParserResult.isPresent()) { + MessageParserResult<JSONObject> messageParserResult = optionalMessageParserResult.get(); + + // Process each message returned from the MessageParser + messageParserResult.getMessages().forEach(message -> { + Optional<ProcessResult> processResult = processMessage(sensorType, message, rawMessage, parser, parserConfigurations); + if (processResult.isPresent()) { + if (processResult.get().isError()) { + parserRunnerResults.addError(processResult.get().getError()); + } else { + parserRunnerResults.addMessage(processResult.get().getMessage()); + } + } + }); + + // If a master exception is thrown by the MessageParser, wrap it with a MetronError and add it to the list of errors + messageParserResult.getMasterThrowable().ifPresent(throwable -> parserRunnerResults.addError(new MetronError() + .withErrorType(Constants.ErrorType.PARSER_ERROR) + .withThrowable(throwable) + .withSensorType(Collections.singleton(sensorType)) + .addRawMessage(rawMessage.getMessage()))); + + // If exceptions are thrown by the MessageParser, wrap them with MetronErrors and add them to the list of errors + parserRunnerResults.addErrors(messageParserResult.getMessageThrowables().entrySet().stream().map(entry -> new MetronError() + .withErrorType(Constants.ErrorType.PARSER_ERROR) + .withThrowable(entry.getValue()) + .withSensorType(Collections.singleton(sensorType)) + .addRawMessage(entry.getKey())).collect(Collectors.toList())); + } + } else { + throw new IllegalStateException(String.format("Could not execute parser. Cannot find configuration for sensor %s.", + sensorType)); + } + return parserRunnerResults; + } + + /** + * Initializes MessageParsers and MessageFilters for sensor types configured in this ParserRunner. Objects are created + * using reflection and the MessageParser configure and init methods are called. + * @param parserConfigSupplier Parser configurations + */ + private void initializeParsers(Supplier<ParserConfigurations> parserConfigSupplier) { + LOG.info("Initializing parsers..."); + sensorToParserComponentMap = new HashMap<>(); + for(String sensorType: sensorTypes) { + if (parserConfigSupplier.get().getSensorParserConfig(sensorType) == null) { + throw new IllegalStateException(String.format("Could not initialize parsers. Cannot find configuration for sensor %s.", + sensorType)); + } + + SensorParserConfig parserConfig = parserConfigSupplier.get().getSensorParserConfig(sensorType); + + LOG.info("Creating parser for sensor {} with parser class = {} and filter class = {} ", + sensorType, parserConfig.getParserClassName(), parserConfig.getFilterClassName()); + + // create message parser + MessageParser<JSONObject> parser = ReflectionUtils + .createInstance(parserConfig.getParserClassName()); + + // create message filter + MessageFilter<JSONObject> filter = null; + parserConfig.getParserConfig().putIfAbsent("stellarContext", stellarContext); + if (!StringUtils.isEmpty(parserConfig.getFilterClassName())) { + filter = Filters.get( + parserConfig.getFilterClassName(), + parserConfig.getParserConfig() + ); + } + + parser.configure(parserConfig.getParserConfig()); + parser.init(); + sensorToParserComponentMap.put(sensorType, new ParserComponent(parser, filter)); + } + } + + /** + * Post-processes parsed messages by: + * <ul> + * <li>Applying field transformations defined in the sensor parser config</li> + * <li>Filtering messages using the configured MessageFilter class</li> + * <li>Validating messages using the MessageParser validate method</li> + * </ul> + * If a message is successfully processed a message is returned in a ProcessResult. If a message fails + * validation, a MetronError object is created and returned in a ProcessResult. If a message is + * filtered out an empty Optional is returned. + * + * @param sensorType Sensor type of the message + * @param message Message parsed by the MessageParser + * @param rawMessage Raw message including metadata + * @param parser MessageParser for the sensor type + * @param parserConfigurations Parser configurations + */ + @SuppressWarnings("unchecked") + protected Optional<ProcessResult> processMessage(String sensorType, JSONObject message, RawMessage rawMessage, + MessageParser<JSONObject> parser, + ParserConfigurations parserConfigurations + ) { + Optional<ProcessResult> processResult = Optional.empty(); + SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType); + sensorParserConfig.getRawMessageStrategy().mergeMetadata( + message, + rawMessage.getMetadata(), + sensorParserConfig.getMergeMetadata(), + sensorParserConfig.getRawMessageStrategyConfig() + ); + message.put(Constants.SENSOR_TYPE, sensorType); + applyFieldTransformations(message, rawMessage, sensorParserConfig); + if (!message.containsKey(Constants.GUID)) { + message.put(Constants.GUID, UUID.randomUUID().toString()); + } + MessageFilter<JSONObject> filter = sensorToParserComponentMap.get(sensorType).getFilter(); + if (filter == null || filter.emit(message, stellarContext)) { + boolean isInvalid = !parser.validate(message); + List<FieldValidator> failedValidators = null; + if (!isInvalid) { + failedValidators = getFailedValidators(message, parserConfigurations); + isInvalid = !failedValidators.isEmpty(); + } + if (isInvalid) { + MetronError error = new MetronError() + .withErrorType(Constants.ErrorType.PARSER_INVALID) + .withSensorType(Collections.singleton(sensorType)) + .addRawMessage(message); + Set<String> errorFields = failedValidators == null ? null : failedValidators.stream() + .flatMap(fieldValidator -> fieldValidator.getInput().stream()) + .collect(Collectors.toSet()); + if (errorFields != null && !errorFields.isEmpty()) { + error.withErrorFields(errorFields); + } + processResult = Optional.of(new ProcessResult(error)); + } else { + processResult = Optional.of(new ProcessResult(message)); + } + } + return processResult; + } + + /** + * Applies Stellar field transformations defined in the sensor parser config. + * @param message Message parsed by the MessageParser + * @param rawMessage Raw message including metadata + * @param sensorParserConfig Sensor parser config + */ + private void applyFieldTransformations(JSONObject message, RawMessage rawMessage, SensorParserConfig sensorParserConfig) { + for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) { + if (handler != null) { + if (!sensorParserConfig.getMergeMetadata()) { + //if we haven't merged metadata, then we need to pass them along as configuration params. + handler.transformAndUpdate( + message, + stellarContext, + sensorParserConfig.getParserConfig(), + rawMessage.getMetadata() + ); + } else { + handler.transformAndUpdate( + message, + stellarContext, + sensorParserConfig.getParserConfig() + ); + } + } + } + } + + private List<FieldValidator> getFailedValidators(JSONObject message, ParserConfigurations parserConfigurations) { + List<FieldValidator> fieldValidations = parserConfigurations.getFieldValidations(); + List<FieldValidator> failedValidators = new ArrayList<>(); + for(FieldValidator validator : fieldValidations) { + if(!validator.isValid(message, parserConfigurations.getGlobalConfig(), stellarContext)) { + failedValidators.add(validator); + } + } + return failedValidators; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java new file mode 100644 index 0000000..7ca853c --- /dev/null +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.parsers; + +import org.apache.metron.common.error.MetronError; + +import java.util.List; + +/** + * Container for the results of parsing a message with a ParserRunner. + * @param <T> The type of a successfully parsed message. + */ +public interface ParserRunnerResults<T> { + + List<T> getMessages(); + + List<MetronError> getErrors(); +} http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java index 213d02c..a9ee305 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,28 +15,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.metron.parsers.bolt; +package org.apache.metron.parsers.bolt; -import com.github.benmanes.caffeine.cache.Cache; import java.io.Serializable; import java.lang.invoke.MethodHandles; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; import java.util.function.Function; -import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; + +import com.github.benmanes.caffeine.cache.Cache; import org.apache.metron.common.Constants; import org.apache.metron.common.bolt.ConfiguredParserBolt; -import org.apache.metron.common.configuration.FieldTransformer; -import org.apache.metron.common.configuration.FieldValidator; +import org.apache.metron.common.configuration.ParserConfigurations; import org.apache.metron.common.configuration.SensorParserConfig; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.error.MetronError; @@ -45,10 +38,8 @@ import org.apache.metron.common.message.MessageGetters; import org.apache.metron.common.message.metadata.RawMessage; import org.apache.metron.common.message.metadata.RawMessageUtil; import org.apache.metron.common.utils.ErrorUtils; -import org.apache.metron.parsers.filters.Filters; -import org.apache.metron.parsers.interfaces.MessageFilter; -import org.apache.metron.parsers.interfaces.MessageParser; -import org.apache.metron.parsers.topology.ParserComponents; +import org.apache.metron.parsers.ParserRunner; +import org.apache.metron.parsers.ParserRunnerResults; import org.apache.metron.stellar.common.CachingStellarProcessor; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.StellarFunctions; @@ -71,26 +62,27 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private OutputCollector collector; - private Map<String, ParserComponents> sensorToComponentMap; + private ParserRunner<JSONObject> parserRunner; + private Map<String, WriterHandler> sensorToWriterMap; private Map<String, String> topicToSensorMap = new HashMap<>(); - private Context stellarContext; private transient MessageGetStrategy messageGetStrategy; - private transient Cache<CachingStellarProcessor.Key, Object> cache; private int requestedTickFreqSecs; private int defaultBatchTimeout; private int batchTimeoutDivisor = 1; public ParserBolt( String zookeeperUrl - , Map<String, ParserComponents> sensorToComponentMap + , ParserRunner parserRunner + , Map<String, WriterHandler> sensorToWriterMap ) { super(zookeeperUrl); - this.sensorToComponentMap = sensorToComponentMap; + this.parserRunner = parserRunner; + this.sensorToWriterMap = sensorToWriterMap; // Ensure that all sensors are either bulk sensors or not bulk sensors. Can't mix and match. Boolean handleAcks = null; - for (Map.Entry<String, ParserComponents> entry : sensorToComponentMap.entrySet()) { - boolean writerHandleAck = entry.getValue().getWriter().handleAck(); + for (Map.Entry<String, WriterHandler> entry : sensorToWriterMap.entrySet()) { + boolean writerHandleAck = entry.getValue().handleAck(); if (handleAcks == null) { handleAcks = writerHandleAck; } else if (!handleAcks.equals(writerHandleAck)) { @@ -126,21 +118,44 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { /** * Used only for unit testing - * @param defaultBatchTimeout */ - protected void setDefaultBatchTimeout(int defaultBatchTimeout) { - this.defaultBatchTimeout = defaultBatchTimeout; + public int getBatchTimeoutDivisor() { + return batchTimeoutDivisor; + } + + /** + * Used only for unit testing + */ + protected void setSensorToWriterMap(Map<String, WriterHandler> sensorToWriterMap) { + this.sensorToWriterMap = sensorToWriterMap; + } + + /** + * Used only for unit testing + */ + protected Map<String, String> getTopicToSensorMap() { + return topicToSensorMap; + } + + /** + * Used only for unit testing + */ + protected void setTopicToSensorMap(Map<String, String> topicToSensorMap) { + this.topicToSensorMap = topicToSensorMap; } /** * Used only for unit testing */ - public int getDefaultBatchTimeout() { - return defaultBatchTimeout; + public void setMessageGetStrategy(MessageGetStrategy messageGetStrategy) { + this.messageGetStrategy = messageGetStrategy; } - public Map<String, ParserComponents> getSensorToComponentMap() { - return sensorToComponentMap; + /** + * Used only for unit testing + */ + public void setOutputCollector(OutputCollector collector) { + this.collector = collector; } /** @@ -155,7 +170,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { // to get the valid WriterConfiguration. But don't store any non-serializable objects, // else Storm will throw a runtime error. Function<WriterConfiguration, WriterConfiguration> configurationXform; - WriterHandler writer = sensorToComponentMap.entrySet().iterator().next().getValue().getWriter(); + WriterHandler writer = sensorToWriterMap.entrySet().iterator().next().getValue(); if (writer.isWriterToBulkWriter()) { configurationXform = WriterToBulkWriter.TRANSFORMATION; } else { @@ -185,37 +200,11 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { super.prepare(stormConf, context, collector); messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get(); this.collector = collector; - - // Build the Stellar cache - Map<String, Object> cacheConfig = new HashMap<>(); - for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) { - String sensor = entry.getKey(); - SensorParserConfig config = getSensorParserConfig(sensor); - - if (config != null) { - cacheConfig.putAll(config.getCacheConfig()); - } - } - cache = CachingStellarProcessor.createCache(cacheConfig); + this.parserRunner.init(this::getConfigurations, initializeStellar()); // Need to prep all sensors - for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) { + for (Map.Entry<String, WriterHandler> entry: sensorToWriterMap.entrySet()) { String sensor = entry.getKey(); - MessageParser<JSONObject> parser = entry.getValue().getMessageParser(); - - initializeStellar(); - if (getSensorParserConfig(sensor) != null && sensorToComponentMap.get(sensor).getFilter() == null) { - getSensorParserConfig(sensor).getParserConfig().putIfAbsent("stellarContext", stellarContext); - if (!StringUtils.isEmpty(getSensorParserConfig(sensor).getFilterClassName())) { - MessageFilter<JSONObject> filter = Filters.get( - getSensorParserConfig(sensor).getFilterClassName(), - getSensorParserConfig(sensor).getParserConfig() - ); - getSensorToComponentMap().get(sensor).setFilter(filter); - } - } - - parser.init(); SensorParserConfig config = getSensorParserConfig(sensor); if (config != null) { @@ -225,9 +214,8 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { throw new IllegalStateException( "Unable to retrieve a parser config for " + sensor); } - parser.configure(config.getParserConfig()); - WriterHandler writer = sensorToComponentMap.get(sensor).getWriter(); + WriterHandler writer = sensorToWriterMap.get(sensor); writer.init(stormConf, context, collector, getConfigurations()); if (defaultBatchTimeout == 0) { //This means getComponentConfiguration was never called to initialize defaultBatchTimeout, @@ -242,169 +230,106 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { } } - protected void initializeStellar() { - Context.Builder builder = new Context.Builder() - .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client) - .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig()) - .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig()) - ; - if(cache != null) { - builder = builder.with(Context.Capabilities.CACHE, () -> cache); - } - this.stellarContext = builder.build(); - StellarFunctions.initialize(stellarContext); - } - @SuppressWarnings("unchecked") @Override public void execute(Tuple tuple) { if (TupleUtils.isTick(tuple)) { - try { - for (Entry<String, ParserComponents> entry : sensorToComponentMap.entrySet()) { - entry.getValue().getWriter().flush(getConfigurations(), messageGetStrategy); - } - } catch (Exception e) { - throw new RuntimeException( - "This should have been caught in the writerHandler. If you see this, file a JIRA", e); - } finally { - collector.ack(tuple); - } + handleTickTuple(tuple); return; } - byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple); + String topic = tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName()); + String sensorType = topicToSensorMap.get(topic); try { - SensorParserConfig sensorParserConfig; - MessageParser<JSONObject> parser; - String sensor; - Map<String, Object> metadata; - if (sensorToComponentMap.size() == 1) { - // There's only one parser, so grab info directly - Entry<String, ParserComponents> sensorParser = sensorToComponentMap.entrySet().iterator() - .next(); - sensor = sensorParser.getKey(); - parser = sensorParser.getValue().getMessageParser(); - sensorParserConfig = getSensorParserConfig(sensor); - } else { - // There's multiple parsers, so pull the topic from the Tuple and look up the sensor - String topic = tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName()); - sensor = topicToSensorMap.get(topic); - parser = sensorToComponentMap.get(sensor).getMessageParser(); - sensorParserConfig = getSensorParserConfig(sensor); - } + ParserConfigurations parserConfigurations = getConfigurations(); + SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType); + RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( sensorParserConfig.getRawMessageStrategy() + , tuple + , originalMessage + , sensorParserConfig.getReadMetadata() + , sensorParserConfig.getRawMessageStrategyConfig() + ); + ParserRunnerResults<JSONObject> parserRunnerResults = parserRunner.execute(sensorType, rawMessage, parserConfigurations); + long numWritten = parserRunnerResults.getMessages().stream() + .map(message -> handleMessage(sensorType, originalMessage, tuple, message, collector)) + .filter(result -> result) + .count(); + parserRunnerResults.getErrors().forEach(error -> ErrorUtils.handleError(collector, error)); - List<FieldValidator> fieldValidations = getConfigurations().getFieldValidations(); - boolean ackTuple = false; - int numWritten = 0; - if (sensorParserConfig != null) { - RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( sensorParserConfig.getRawMessageStrategy() - , tuple - , originalMessage - , sensorParserConfig.getReadMetadata() - , sensorParserConfig.getRawMessageStrategyConfig() - ); - metadata = rawMessage.getMetadata(); - - Optional<List<JSONObject>> messages = parser.parseOptional(rawMessage.getMessage()); - for (JSONObject message : messages.orElse(Collections.emptyList())) { - //we want to ack the tuple in the situation where we have are not doing a bulk write - //otherwise we want to defer to the writerComponent who will ack on bulk commit. - WriterHandler writer = sensorToComponentMap.get(sensor).getWriter(); - ackTuple = !writer.handleAck(); - - sensorParserConfig.getRawMessageStrategy().mergeMetadata( - message, - metadata, - sensorParserConfig.getMergeMetadata(), - sensorParserConfig.getRawMessageStrategyConfig() - ); - message.put(Constants.SENSOR_TYPE, sensor); - - for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) { - if (handler != null) { - if (!sensorParserConfig.getMergeMetadata()) { - //if we haven't merged metadata, then we need to pass them along as configuration params. - handler.transformAndUpdate( - message, - stellarContext, - sensorParserConfig.getParserConfig(), - metadata - ); - } else { - handler.transformAndUpdate( - message, - stellarContext, - sensorParserConfig.getParserConfig() - ); - } - } - } - if (!message.containsKey(Constants.GUID)) { - message.put(Constants.GUID, UUID.randomUUID().toString()); - } - - MessageFilter<JSONObject> filter = sensorToComponentMap.get(sensor).getFilter(); - if (filter == null || filter.emitTuple(message, stellarContext)) { - boolean isInvalid = !parser.validate(message); - List<FieldValidator> failedValidators = null; - if (!isInvalid) { - failedValidators = getFailedValidators(message, fieldValidations); - isInvalid = !failedValidators.isEmpty(); - } - if (isInvalid) { - MetronError error = new MetronError() - .withErrorType(Constants.ErrorType.PARSER_INVALID) - .withSensorType(Collections.singleton(sensor)) - .addRawMessage(message); - Set<String> errorFields = failedValidators == null ? null : failedValidators.stream() - .flatMap(fieldValidator -> fieldValidator.getInput().stream()) - .collect(Collectors.toSet()); - if (errorFields != null && !errorFields.isEmpty()) { - error.withErrorFields(errorFields); - } - ErrorUtils.handleError(collector, error); - } else { - numWritten++; - writer.write(sensor, tuple, message, getConfigurations(), messageGetStrategy); - } - } - } - } //if we are supposed to ack the tuple OR if we've never passed this tuple to the bulk writer //(meaning that none of the messages are valid either globally or locally) //then we want to handle the ack ourselves. - if (ackTuple || numWritten == 0) { + if (!sensorToWriterMap.get(sensorType).handleAck() || numWritten == 0) { collector.ack(tuple); } } catch (Throwable ex) { - handleError(originalMessage, tuple, ex, collector); + handleError(sensorType, originalMessage, tuple, ex, collector); + collector.ack(tuple); + } + } + + protected Context initializeStellar() { + Map<String, Object> cacheConfig = new HashMap<>(); + for (String sensorType: this.parserRunner.getSensorTypes()) { + SensorParserConfig config = getSensorParserConfig(sensorType); + + if (config != null) { + cacheConfig.putAll(config.getCacheConfig()); + } + } + Cache<CachingStellarProcessor.Key, Object> cache = CachingStellarProcessor.createCache(cacheConfig); + + Context.Builder builder = new Context.Builder() + .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client) + .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig()) + .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig()) + ; + if(cache != null) { + builder = builder.with(Context.Capabilities.CACHE, () -> cache); + } + Context stellarContext = builder.build(); + StellarFunctions.initialize(stellarContext); + return stellarContext; + } + + protected void handleTickTuple(Tuple tuple) { + try { + for (Entry<String, WriterHandler> entry : sensorToWriterMap.entrySet()) { + entry.getValue().flush(getConfigurations(), messageGetStrategy); + } + } catch (Exception e) { + throw new RuntimeException( + "This should have been caught in the writerHandler. If you see this, file a JIRA", e); + } finally { + collector.ack(tuple); } } - protected void handleError(byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) { + protected boolean handleMessage(String sensorType, byte[] originalMessage, Tuple tuple, JSONObject message, OutputCollector collector) { + WriterHandler writer = sensorToWriterMap.get(sensorType); + try { + writer.write(sensorType, tuple, message, getConfigurations(), messageGetStrategy); + return true; + } catch (Exception ex) { + handleError(sensorType, originalMessage, tuple, ex, collector); + return false; + } + } + + protected void handleError(String sensorType, byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) { MetronError error = new MetronError() .withErrorType(Constants.ErrorType.PARSER_ERROR) .withThrowable(ex) - .withSensorType(sensorToComponentMap.keySet()) + .withSensorType(Collections.singleton(sensorType)) .addRawMessage(originalMessage); ErrorUtils.handleError(collector, error); - collector.ack(tuple); - } - - private List<FieldValidator> getFailedValidators(JSONObject input, List<FieldValidator> validators) { - List<FieldValidator> failedValidators = new ArrayList<>(); - for(FieldValidator validator : validators) { - if(!validator.isValid(input, getConfigurations().getGlobalConfig(), stellarContext)) { - failedValidators.add(validator); - } - } - return failedValidators; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(Constants.ERROR_STREAM, new Fields("message")); } + } http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java index 9cdafa3..1fa1feb 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java @@ -67,7 +67,7 @@ public class BroMessageFilter implements MessageFilter<JSONObject>{ */ @Override - public boolean emitTuple(JSONObject message, Context context) { + public boolean emit(JSONObject message, Context context) { String protocol = (String) message.get(_key); return _known_protocols.contains(protocol); } http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java index 15a035a..8300ff4 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java @@ -54,7 +54,7 @@ public class StellarFilter implements MessageFilter<JSONObject> { } @Override - public boolean emitTuple(JSONObject message, Context context) { + public boolean emit(JSONObject message, Context context) { VariableResolver resolver = new MapVariableResolver(message); return processor.parse(query, resolver, functionResolver, context); }