http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-enrichment/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/README.md 
b/metron-platform/metron-enrichment/README.md
index 8a53e71..c72970f 100644
--- a/metron-platform/metron-enrichment/README.md
+++ b/metron-platform/metron-enrichment/README.md
@@ -31,36 +31,22 @@ data format (e.g. a JSON Map structure with 
`original_message` and
 
 ## Enrichment Architecture
 
-![Architecture](enrichment_arch.png)
+![Unified Architecture](unified_enrichment_arch.svg)
 
 ### Unified Enrichment Topology
 
-There is an experimental unified enrichment topology which is shipped.
-Currently the architecture, as described above, has a split/join in
-order to perform enrichments in parallel.  This poses some issues in
-terms of ease of tuning and reasoning about performance.  
-
-In order to deal with these issues, there is an alternative enrichment 
topology which
-uses data parallelism as opposed to the split/join task parallelism.
-This architecture uses a worker pool to fully enrich any message within 
-a worker.  This results in 
+The unified enrichment topology uses data parallelism as opposed to the 
deprecated
+split/join topology's task parallelism. This architecture uses a worker pool 
to fully
+enrich any message within a worker.  This results in
 * Fewer bolts in the topology 
 * Each bolt fully operates on a message.
 * Fewer network hops
 
-![Unified Architecture](unified_enrichment_arch.svg)
-
-This architecture is fully backwards compatible; the only difference is
-how the enrichment will operate on each message (in one bolt where the
-split/join is done in a threadpool as opposed
+This architecture is fully backwards compatible with the old split-join
+topology; the only difference is how the enrichment will operate on each
+message (in one bolt where the split/join is done in a threadpool as opposed
 to split across multiple bolts).
 
-#### Using It
-
-In order to use this, you will need to 
-* Edit `$METRON_HOME/bin/start_enrichment_topology.sh` and adjust it to use 
`remote-unified.yaml` instead of `remote.yaml`
-* Restart the enrichment topology.
-
 #### Configuring It
 
 There are two parameters which you might want to tune in this topology.
@@ -76,6 +62,19 @@ intel bolt, the configurations will be taken from the 
respective join bolt
 parallelism.  When proper ambari support for this is added, we will add
 its own property.
 
+### Split-Join Enrichment Topology
+
+The now-deprecated split/join topology is also available and performs 
enrichments in parallel.
+This poses some issues in terms of ease of tuning and reasoning about 
performance.
+
+![Architecture](enrichment_arch.png)
+
+#### Using It
+
+In order to use the older, deprecated topology, you will need to
+* Edit `$METRON_HOME/bin/start_enrichment_topology.sh` and adjust it to use 
`remote-splitjoin.yaml` instead of `remote-unified.yaml`
+* Restart the enrichment topology.
+
 ## Enrichment Configuration
 
 The configuration for the `enrichment` topology, the topology primarily
@@ -85,7 +84,6 @@ defined by JSON documents stored in zookeeper.
 There are two types of configurations at the moment, `global` and
 `sensor` specific.  
 
-
 ## Global Configuration 
 
 There are a few enrichments which have independent configurations, such
@@ -134,7 +132,6 @@ The configuration is a complex JSON object with the 
following top level fields:
 
 ### The `enrichment` Configuration
 
-
 | Field            | Description                                               
                                                                                
                                                                                
    | Example                                                          |
 
|------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------|
 | `fieldToTypeMap` | In the case of a simple HBase enrichment (i.e. a 
key/value lookup), the mapping between fields and the enrichment types 
associated with those fields must be known.  This enrichment type is used as 
part of the HBase key. Note: applies to hbaseEnrichment only. | 
`"fieldToTypeMap" : { "ip_src_addr" : [ "asset_enrichment" ] }`  |

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 0677453..e0f1f0c 100644
--- 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -271,6 +271,7 @@ public class GenericEnrichmentBolt extends 
ConfiguredEnrichmentBolt {
 
   @Override
   public void cleanup() {
+    super.cleanup();
     adapter.cleanup();
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
index cfa101d..fcfa918 100644
--- 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
+++ 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
@@ -17,16 +17,12 @@
  */
 package org.apache.metron.enrichment.bolt;
 
-import com.google.common.base.Joiner;
 import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.metron.common.configuration.ConfigurationType;
 import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
 import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
-import org.apache.metron.common.configuration.enrichment.threatintel.RuleScore;
-import 
org.apache.metron.common.configuration.enrichment.threatintel.ThreatScore;
-import 
org.apache.metron.common.configuration.enrichment.threatintel.ThreatTriageConfig;
 import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.utils.MessageUtils;
 import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
@@ -34,8 +30,6 @@ import org.apache.metron.enrichment.utils.ThreatIntelUtils;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.StellarFunctions;
 import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
-import org.apache.metron.stellar.common.utils.ConversionUtils;
-import org.apache.metron.threatintel.triage.ThreatTriageProcessor;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh
 
b/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh
index 77c3a77..d3ed8ad 100755
--- 
a/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh
+++ 
b/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh
@@ -20,11 +20,11 @@ METRON_VERSION=${project.version}
 METRON_HOME=/usr/metron/$METRON_VERSION
 TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION-uber.jar
 
-# there are two enrichment topologies.  by default, the split-join enrichment 
topology is executed
+# There are two enrichment topologies. By default, the unified enrichment 
topology is executed. Split-join is now deprecated.
 SPLIT_JOIN_ARGS="--remote $METRON_HOME/flux/enrichment/remote-splitjoin.yaml 
--filter $METRON_HOME/config/enrichment-splitjoin.properties"
 UNIFIED_ARGS="--remote $METRON_HOME/flux/enrichment/remote-unified.yaml 
--filter $METRON_HOME/config/enrichment-unified.properties"
 
 # by passing in different args, the user can execute an alternative enrichment 
topology
-ARGS=${@:-$SPLIT_JOIN_ARGS}
+ARGS=${@:-$UNIFIED_ARGS}
 
 storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux $ARGS

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
index 588fc58..083628c 100644
--- 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
+++ 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
@@ -207,7 +207,7 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
     }
     UnitTestHelper.setLog4jLevel(BulkWriterComponent.class, Level.ERROR);
     verify(outputCollector, times(5)).ack(tuple);
-    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), 
any(Values.class));
+    verify(outputCollector, times(5)).emit(eq(Constants.ERROR_STREAM), 
any(Values.class));
     verify(outputCollector, times(1)).reportError(any(Throwable.class));
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
index 2e1968a..cfe5752 100644
--- 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
+++ 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
@@ -46,11 +46,11 @@ import org.junit.rules.ExpectedException;
 public abstract class SearchIntegrationTest {
   /**
    * [
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.1", "ip_src_port": 8010, 
"long_field": 10000, "timestamp":1, "latitude": 48.5839, "score": 10.0, 
"is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 1", 
"ttl": "data 1", "guid":"bro_1"},
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, 
"long_field": 20000, "timestamp":2, "latitude": 48.0001, "score": 50.0, 
"is_alert":false, "location_point": "48.5839,7.7455", "bro_field": "bro data 
2", "ttl": "data 2", "guid":"bro_2"},
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, 
"long_field": 10000, "timestamp":3, "latitude": 48.5839, "score": 20.0, 
"is_alert":true, "location_point": "50.0,7.7455", "bro_field": "bro data 3", 
"ttl": "data 3", "guid":"bro_3"},
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, 
"long_field": 10000, "timestamp":4, "latitude": 48.5839, "score": 10.0, 
"is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 4", 
"ttl": "data 4", "guid":"bro_4"},
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, 
"long_field": 10000, "timestamp":5, "latitude": 48.5839, "score": 98.0, 
"is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 5", 
"ttl": "data 5", "guid":"bro_5"}
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.1", "ip_src_port": 8010, 
"long_field": 10000, "timestamp":1, "latitude": 48.5839, "score": 10.0, 
"is_alert":true, "location_point": "48.5839,7.7455", "method": "bro data 1", 
"ttl": "data 1", "guid":"bro_1"},
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, 
"long_field": 20000, "timestamp":2, "latitude": 48.0001, "score": 50.0, 
"is_alert":false, "location_point": "48.5839,7.7455", "method": "bro data 2", 
"ttl": "data 2", "guid":"bro_2"},
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, 
"long_field": 10000, "timestamp":3, "latitude": 48.5839, "score": 20.0, 
"is_alert":true, "location_point": "50.0,7.7455", "method": "bro data 3", 
"ttl": "data 3", "guid":"bro_3"},
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, 
"long_field": 10000, "timestamp":4, "latitude": 48.5839, "score": 10.0, 
"is_alert":true, "location_point": "48.5839,7.7455", "method": "bro data 4", 
"ttl": "data 4", "guid":"bro_4"},
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, 
"long_field": 10000, "timestamp":5, "latitude": 48.5839, "score": 98.0, 
"is_alert":true, "location_point": "48.5839,7.7455", "method": "bro data 5", 
"ttl": "data 5", "guid":"bro_5"}
    * ]
    */
   @Multiline
@@ -58,11 +58,11 @@ public abstract class SearchIntegrationTest {
 
   /**
    * [
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 
8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "score": 50.0, 
"is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, "ttl": 1, 
"guid":"snort_1", "threat:triage:score":10.0},
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 
8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "score": 10.0, 
"is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, "ttl": 
2, "guid":"snort_2", "threat:triage:score":20.0},
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 
8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "score": 20.0, 
"is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 30, "ttl": 
3, "guid":"snort_3"},
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 
8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "score": 50.0, 
"is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 40, "ttl": 
4, "guid":"snort_4"},
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 
8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "score": 10.0, 
"is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 50, "ttl": 
5, "guid":"snort_5"}
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 
8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "score": 50.0, 
"is_alert":false, "location_point": "50.0,7.7455", "sig_generator": 
"sig_generator 1", "ttl": 1, "guid":"snort_1", "threat:triage:score":10.0},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 
8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "score": 10.0, 
"is_alert":true, "location_point": "48.5839,7.7455", "sig_generator": 
"sig_generator 2", "ttl": 2, "guid":"snort_2", "threat:triage:score":20.0},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 
8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "score": 20.0, 
"is_alert":false, "location_point": "48.5839,7.7455", "sig_generator": 
"sig_generator 3", "ttl": 3, "guid":"snort_3"},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 
8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "score": 50.0, 
"is_alert":true, "location_point": "48.5839,7.7455", "sig_generator": 
"sig_generator 4", "ttl": 4, "guid":"snort_4"},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 
8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "score": 10.0, 
"is_alert":false, "location_point": "48.5839,7.7455", "sig_generator": 
"sig_generator 5", "ttl": 5, "guid":"snort_5"}
    * ]
    */
   @Multiline
@@ -272,7 +272,7 @@ public abstract class SearchIntegrationTest {
 
   /**
    * {
-   * "facetFields": ["snort_field"],
+   * "facetFields": ["sig_generator"],
    * "indices": ["bro", "snort"],
    * "query": "*:*",
    * "from": 0,
@@ -698,14 +698,14 @@ public abstract class SearchIntegrationTest {
 
     Map<String, Map<String, Long>> facetCounts = response.getFacetCounts();
     Assert.assertEquals(1, facetCounts.size());
-    Map<String, Long> snortFieldCounts = facetCounts.get("snort_field");
+    Map<String, Long> snortFieldCounts = facetCounts.get("sig_generator");
     Assert.assertEquals(5, snortFieldCounts.size());
 
-    Assert.assertEquals(1L, snortFieldCounts.get("50").longValue());
-    Assert.assertEquals(1L, snortFieldCounts.get("40").longValue());
-    Assert.assertEquals(1L, snortFieldCounts.get("30").longValue());
-    Assert.assertEquals(1L, snortFieldCounts.get("20").longValue());
-    Assert.assertEquals(1L, snortFieldCounts.get("10").longValue());
+    Assert.assertEquals(1L, snortFieldCounts.get("sig_generator 
5").longValue());
+    Assert.assertEquals(1L, snortFieldCounts.get("sig_generator 
4").longValue());
+    Assert.assertEquals(1L, snortFieldCounts.get("sig_generator 
3").longValue());
+    Assert.assertEquals(1L, snortFieldCounts.get("sig_generator 
2").longValue());
+    Assert.assertEquals(1L, snortFieldCounts.get("sig_generator 
1").longValue());
     response.getFacetCounts();
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-integration-test/src/main/sample/patterns/test
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-integration-test/src/main/sample/patterns/test 
b/metron-platform/metron-integration-test/src/main/sample/patterns/test
index a88a255..ebbf9c4 100644
--- a/metron-platform/metron-integration-test/src/main/sample/patterns/test
+++ b/metron-platform/metron-integration-test/src/main/sample/patterns/test
@@ -1,2 +1,3 @@
 YAF_TIME_FORMAT %{YEAR:UNWANTED}-%{MONTHNUM:UNWANTED}-%{MONTHDAY:UNWANTED}[T 
]%{HOUR:UNWANTED}:%{MINUTE:UNWANTED}:%{SECOND:UNWANTED}
 YAF_DELIMITED 
%{NUMBER:start_time}\|%{YAF_TIME_FORMAT:end_time}\|%{SPACE:UNWANTED}%{BASE10NUM:duration}\|%{SPACE:UNWANTED}%{BASE10NUM:rtt}\|%{SPACE:UNWANTED}%{INT:protocol}\|%{SPACE:UNWANTED}%{IP:ip_src_addr}\|%{SPACE:UNWANTED}%{INT:ip_src_port}\|%{SPACE:UNWANTED}%{IP:ip_dst_addr}\|%{SPACE:UNWANTED}%{INT:ip_dst_port}\|%{SPACE:UNWANTED}%{DATA:iflags}\|%{SPACE:UNWANTED}%{DATA:uflags}\|%{SPACE:UNWANTED}%{DATA:riflags}\|%{SPACE:UNWANTED}%{DATA:ruflags}\|%{SPACE:UNWANTED}%{WORD:isn}\|%{SPACE:UNWANTED}%{DATA:risn}\|%{SPACE:UNWANTED}%{DATA:tag}\|%{GREEDYDATA:rtag}\|%{SPACE:UNWANTED}%{INT:pkt}\|%{SPACE:UNWANTED}%{INT:oct}\|%{SPACE:UNWANTED}%{INT:rpkt}\|%{SPACE:UNWANTED}%{INT:roct}\|%{SPACE:UNWANTED}%{INT:app}\|%{GREEDYDATA:end_reason}
+ELBACCESSLOGS %{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:elb} 
%{IP:clientip}:%{INT:clientport} (?:(%{IP:backendip}:?:%{INT:backendport})|-) 
%{NUMBER:request_processing_time} %{NUMBER:backend_processing_time} 
%{NUMBER:response_processing_time} (?:-|%{INT:elb_status_code}) 
(?:-|%{INT:backend_status_code}) %{INT:received_bytes} %{INT:sent_bytes} 
\"(?:-|(?:%{WORD:verb} 
%{URIPROTO:proto}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST:urihost})?(?:%{URIPATH:path}(?:%{URIPARAM:params})?)?(?:
 HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest}))\" 
\"(?:-|%{DATA:user_agent})\" (?:-|%{NOTSPACE:ssl_cipher}) 
(?:-|%{NOTSPACE:ssl_protocol})

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md 
b/metron-platform/metron-parsers/README.md
index 381b0ee..cfcf6ed 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -29,10 +29,12 @@ There are two general types types of parsers:
   * Grok parser: `org.apache.metron.parsers.GrokParser` with possible 
`parserConfig` entries of 
     * `grokPath` : The path in HDFS (or in the Jar) to the grok statement
     * `patternLabel` : The pattern label to use from the grok statement
+    * `multiLine` : The raw data passed in should be handled as a long with 
multiple lines, with each line to be parsed separately. This setting's valid 
values are 'true' or 'false'.  The default if unset is 'false'. When set the 
parser will handle multiple lines with successfully processed lines emitted 
normally, and lines with errors sent to the error topic.
     * `timestampField` : The field to use for timestamp
     * `timeFields` : A list of fields to be treated as time
     * `dateFormat` : The date format to use to parse the time fields
     * `timezone` : The timezone to use. `UTC` is default.
+    * The Grok parser supports either 1 line to parse per incoming message, or 
incoming messages with multiple log lines, and will produce a json message per 
line
   * CSV Parser: `org.apache.metron.parsers.csv.CSVParser` with possible 
`parserConfig` entries of
     * `timestampFormat` : The date format of the timestamp to use.  If 
unspecified, the parser assumes the timestamp is ms since unix epoch.
     * `columns` : A map of column names you wish to extract from the CSV to 
their offsets (e.g. `{ 'name' : 1, 'profession' : 3}`  would be a column map 
for extracting the 2nd and 4th columns from a CSV)
@@ -513,9 +515,13 @@ Java parser adapters are intended for higher-velocity 
topologies and are not eas
 * org.apache.metron.parsers.syslog.Syslog5424Parser : Parse Syslog RFC 5424 
messages
 
 ### Grok Parser Adapters
-Grok parser adapters are designed primarly for someone who is not a Java coder 
for quickly standing up a parser adapter for lower velocity topologies.  Grok 
relies on Regex for message parsing, which is much slower than purpose-built 
Java parsers, but is more extensible.  Grok parsers are defined via a config 
file and the topplogy does not need to be recombiled in order to make changes 
to them.  An example of a Grok perser is:
+Grok parser adapters are designed primarily for someone who is not a Java 
coder for quickly standing up a parser adapter for lower velocity topologies.  
Grok relies on Regex for message parsing, which is much slower than 
purpose-built Java parsers, but is more extensible.  Grok parsers are defined 
via a config file and the topplogy does not need to be recompiled in order to 
make changes to them.  Example of a Grok parsers are:
 
-* org.apache.metron.parsers.GrokParser
+* org.apache.metron.parsers.GrokParser and 
org.apache.metron.parsers.websphere.GrokWebSphereParser
+
+Parsers that derive from GrokParser typically allow the GrokParser to parse 
the messages, and then override the methods for postParse to do further parsing.
+When this is the case, and the Parser has not overridden `parse(byte[])` or 
`parseResultOptional(byte[])` these parsers will gain support for treating 
byte[] input as multiple lines, with each line parsed as a separate message ( 
and returned as such).
+This is enabled by using the `"multiline":"true"` Parser configuration option.
 
 For more information on the Grok project please refer to the following link:
 

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/message-parser-implementation-notes.md
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/message-parser-implementation-notes.md 
b/metron-platform/metron-parsers/message-parser-implementation-notes.md
new file mode 100644
index 0000000..b8afe04
--- /dev/null
+++ b/metron-platform/metron-parsers/message-parser-implementation-notes.md
@@ -0,0 +1,57 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+# `MessageParser` implementation notes 
+
+
+1. Supporting multiple JSONObject returns from a single byte[]
+The original `MessageParser` interface supported parsing a message and 
returning a `List<JSONObject>`.  Therefore explicitly supporting multiple 
messages from one input.
+While this is fine, it only allows for the complete failure of a message for 
any reason.  There can only be one exception thrown.  This means that if there 
_are_ multiple messages in the buffer, any one failure will necessarily fail 
all of them.
+To improve on this situation, a new method was added to the `MessageParser` 
interface (with a default implementation), that introduces a return type to 
provide not only the JSONObjects produced, but also a `Map` of messages -> 
throwable.
+
+To support this in your parser, you should:
+
+- Implement the new method
+
+```java
+ @Override
+  public Optional<MessageParserResult<JSONObject>> parseOptionalResult(byte[] 
rawMessage)
+```
+
+- Implement the original `List<JSONObject> parse(byte[] message)` to delegate 
to that method such as below:
+
+```java
+ @Override
+  public List<JSONObject> parse(byte[] rawMessage) {
+    Optional<MessageParserResult<JSONObject>> resultOptional = 
parseOptionalResult(rawMessage);
+    if (!resultOptional.isPresent()) {
+      return Collections.EMPTY_LIST;
+    }
+    Map<Object,Throwable> errors = resultOptional.get().getMessageThrowables();
+    if (!errors.isEmpty()) {
+      throw new 
RuntimeException(errors.entrySet().iterator().next().getValue());
+    }
+
+    return resultOptional.get().getMessages();
+  }
+```
+
+- You *may* want to govern treating the incoming buffer as multiline or not by 
adding a configuration option for your parser, such as 
`"multiline":"true"|"false"`
+
+- See the org.apache.metron.parsers.GrokParser for an example of this 
implementation.
+
+The Metron system itself will call the new `parseOptionalResult` method during 
processing.  The default implementation in the interface handles backwards 
compatability with previous implementations.

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultMessageParserResult.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultMessageParserResult.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultMessageParserResult.java
new file mode 100644
index 0000000..11d15eb
--- /dev/null
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultMessageParserResult.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers;
+
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class DefaultMessageParserResult<T> implements MessageParserResult<T> {
+  private List<T> messages = new ArrayList<>();
+  private Map<Object, Throwable> errors = new HashMap<>();
+  private Throwable masterThrowable;
+
+  public DefaultMessageParserResult() {
+  }
+
+  public DefaultMessageParserResult(Throwable masterThrowable) {
+    this.masterThrowable = masterThrowable;
+  }
+
+  public DefaultMessageParserResult(List<T> list) {
+    messages.addAll(list);
+  }
+
+  public DefaultMessageParserResult(Map<Object, Throwable> map) {
+    errors.putAll(map);
+  }
+
+  public DefaultMessageParserResult(List<T> list, Map<Object, Throwable> map) {
+    messages.addAll(list);
+    errors.putAll(map);
+  }
+
+  public void addMessage(T message) {
+    messages.add(message);
+  }
+
+  public void addError(Object message, Throwable throwable) {
+    errors.put(message, throwable);
+  }
+
+  @Override
+  public List<T> getMessages() {
+    return messages;
+  }
+
+  @Override
+  public Map<Object, Throwable> getMessageThrowables() {
+    return errors;
+  }
+
+  @Override
+  public Optional<Throwable> getMasterThrowable() {
+    return Optional.ofNullable(masterThrowable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java
new file mode 100644
index 0000000..79a9b5d
--- /dev/null
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers;
+
+import org.apache.metron.common.error.MetronError;
+import org.json.simple.JSONObject;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Default implementation of ParserRunnerResults.
+ */
+public class DefaultParserRunnerResults implements 
ParserRunnerResults<JSONObject> {
+
+  private List<JSONObject> messages = new ArrayList<>();
+  private List<MetronError> errors = new ArrayList<>();
+
+  public List<JSONObject> getMessages() {
+    return messages;
+  }
+
+  public List<MetronError> getErrors() {
+    return errors;
+  }
+
+  public void addMessage(JSONObject message) {
+    this.messages.add(message);
+  }
+
+  public void addError(MetronError error) {
+    this.errors.add(error);
+  }
+
+  public void addErrors(List<MetronError> errors) {
+    this.errors.addAll(errors);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    ParserRunnerResults parserResult = (ParserRunnerResults) o;
+    return Objects.equals(messages, parserResult.getMessages()) &&
+            Objects.equals(errors, parserResult.getErrors());
+  }
+
+  @Override
+  public int hashCode() {
+    int result = messages != null ? messages.hashCode() : 0;
+    result = 31 * result + (errors != null ? errors.hashCode() : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
index 99ac390..6bdfb81 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,32 +15,43 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.metron.parsers;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
+import oi.thekraken.grok.api.Grok;
+import oi.thekraken.grok.api.Match;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.common.Constants;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Serializable;
+import java.io.StringReader;
 import java.lang.invoke.MethodHandles;
+import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.TimeZone;
-import oi.thekraken.grok.api.Grok;
-import oi.thekraken.grok.api.Match;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.metron.common.Constants;
-import org.apache.metron.parsers.interfaces.MessageParser;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
 
 public class GrokParser implements MessageParser<JSONObject>, Serializable {
 
@@ -48,6 +59,7 @@ public class GrokParser implements MessageParser<JSONObject>, 
Serializable {
 
   protected transient Grok grok;
   protected String grokPath;
+  protected boolean multiLine = false;
   protected String patternLabel;
   protected List<String> timeFields = new ArrayList<>();
   protected String timestampField;
@@ -55,8 +67,13 @@ public class GrokParser implements 
MessageParser<JSONObject>, Serializable {
   protected String patternsCommonDir = "/patterns/common";
 
   @Override
+  @SuppressWarnings("unchecked")
   public void configure(Map<String, Object> parserConfig) {
     this.grokPath = (String) parserConfig.get("grokPath");
+    String multiLineString = (String) parserConfig.get("multiLine");
+    if (!StringUtils.isBlank(multiLineString)) {
+      multiLine = Boolean.parseBoolean(multiLineString);
+    }
     this.patternLabel = (String) parserConfig.get("patternLabel");
     this.timestampField = (String) parserConfig.get("timestampField");
     List<String> timeFieldsParam = (List<String>) 
parserConfig.get("timeFields");
@@ -126,11 +143,73 @@ public class GrokParser implements 
MessageParser<JSONObject>, Serializable {
 
   @SuppressWarnings("unchecked")
   @Override
-  public List<JSONObject> parse(byte[] rawMessage) {
+  public Optional<MessageParserResult<JSONObject>> parseOptionalResult(byte[] 
rawMessage) {
     if (grok == null) {
       init();
     }
+    if (multiLine) {
+      return parseMultiLine(rawMessage);
+    }
+    return parseSingleLine(rawMessage);
+  }
+
+  @SuppressWarnings("unchecked")
+  private Optional<MessageParserResult<JSONObject>> parseMultiLine(byte[] 
rawMessage) {
     List<JSONObject> messages = new ArrayList<>();
+    Map<Object,Throwable> errors = new HashMap<>();
+    String originalMessage = null;
+    // read the incoming raw data as if it may have multiple lines of logs
+    // if there is only only one line, it will just get processed.
+    try (BufferedReader reader = new BufferedReader(new StringReader(new 
String(rawMessage, StandardCharsets.UTF_8)))) {
+      while ((originalMessage = reader.readLine()) != null) {
+        LOG.debug("Grok parser parsing message: {}", originalMessage);
+        try {
+          Match gm = grok.match(originalMessage);
+          gm.captures();
+          JSONObject message = new JSONObject();
+          message.putAll(gm.toMap());
+
+          if (message.size() == 0) {
+            Throwable rte = new RuntimeException("Grok statement produced a 
null message. Original message was: "
+                    + originalMessage + " and the parsed message was: " + 
message + " . Check the pattern at: "
+                    + grokPath);
+            errors.put(originalMessage, rte);
+            continue;
+          }
+          message.put("original_string", originalMessage);
+          for (String timeField : timeFields) {
+            String fieldValue = (String) message.get(timeField);
+            if (fieldValue != null) {
+              message.put(timeField, toEpoch(fieldValue));
+            }
+          }
+          if (timestampField != null) {
+            message.put(Constants.Fields.TIMESTAMP.getName(), 
formatTimestamp(message.get(timestampField)));
+          }
+          message.remove(patternLabel);
+          postParse(message);
+          messages.add(message);
+          LOG.debug("Grok parser parsed message: {}", message);
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+          errors.put(originalMessage, e);
+        }
+      }
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+      Exception innerException = new IllegalStateException("Grok parser Error: 
"
+              + e.getMessage()
+              + " on "
+              + originalMessage, e);
+      return Optional.of(new DefaultMessageParserResult<>(innerException));
+    }
+    return Optional.of(new DefaultMessageParserResult<>(messages, errors));
+  }
+
+  @SuppressWarnings("unchecked")
+  private Optional<MessageParserResult<JSONObject>> parseSingleLine(byte[] 
rawMessage) {
+    List<JSONObject> messages = new ArrayList<>();
+    Map<Object,Throwable> errors = new HashMap<>();
     String originalMessage = null;
     try {
       originalMessage = new String(rawMessage, "UTF-8");
@@ -140,30 +219,36 @@ public class GrokParser implements 
MessageParser<JSONObject>, Serializable {
       JSONObject message = new JSONObject();
       message.putAll(gm.toMap());
 
-      if (message.size() == 0)
-        throw new RuntimeException("Grok statement produced a null message. 
Original message was: "
+      if (message.size() == 0) {
+        Throwable rte = new RuntimeException("Grok statement produced a null 
message. Original message was: "
                 + originalMessage + " and the parsed message was: " + message 
+ " . Check the pattern at: "
                 + grokPath);
-
-      message.put("original_string", originalMessage);
-      for (String timeField : timeFields) {
-        String fieldValue = (String) message.get(timeField);
-        if (fieldValue != null) {
-          message.put(timeField, toEpoch(fieldValue));
+        errors.put(originalMessage, rte);
+      } else {
+        message.put("original_string", originalMessage);
+        for (String timeField : timeFields) {
+          String fieldValue = (String) message.get(timeField);
+          if (fieldValue != null) {
+            message.put(timeField, toEpoch(fieldValue));
+          }
         }
+        if (timestampField != null) {
+          message.put(Constants.Fields.TIMESTAMP.getName(), 
formatTimestamp(message.get(timestampField)));
+        }
+        message.remove(patternLabel);
+        postParse(message);
+        messages.add(message);
+        LOG.debug("Grok parser parsed message: {}", message);
       }
-      if (timestampField != null) {
-        message.put(Constants.Fields.TIMESTAMP.getName(), 
formatTimestamp(message.get(timestampField)));
-      }
-      message.remove(patternLabel);
-      postParse(message);
-      messages.add(message);
-      LOG.debug("Grok parser parsed message: {}", message);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
-      throw new IllegalStateException("Grok parser Error: " + e.getMessage() + 
" on " + originalMessage , e);
+      Exception innerException = new IllegalStateException("Grok parser Error: 
"
+              + e.getMessage()
+              + " on "
+              + originalMessage, e);
+      return Optional.of(new DefaultMessageParserResult<>(innerException));
     }
-    return messages;
+    return Optional.of(new DefaultMessageParserResult<JSONObject>(messages, 
errors));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java
new file mode 100644
index 0000000..f9123b1
--- /dev/null
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.apache.metron.stellar.dsl.Context;
+
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * A ParserRunner is responsible for initializing MessageParsers and parsing 
messages with the appropriate MessageParser.
+ * The information needed to initialize a MessageParser is supplied by the 
parser config supplier.  After the parsers
+ * are initialized, the execute method can then be called for each message and 
will return a ParserRunnerResults object
+ * that contains a list of parsed messages and/or a list of errors.
+ * @param <T> The type of a successfully parsed message.
+ */
+public interface ParserRunner<T> {
+
+  /**
+   * Return a list of all sensor types that can be parsed with this 
ParserRunner.
+   * @return Sensor types
+   */
+  Set<String> getSensorTypes();
+
+  /**
+   *
+   * @param parserConfigSupplier Supplies parser configurations
+   * @param stellarContext Stellar context used to apply Stellar functions 
during field transformations
+   */
+  void init(Supplier<ParserConfigurations> parserConfigSupplier, Context 
stellarContext);
+
+  /**
+   * Parses a message and either returns the message or an error.
+   * @param sensorType Sensor type of the message
+   * @param rawMessage Raw message including metadata
+   * @param parserConfigurations Parser configurations
+   * @return ParserRunnerResults containing a list of messages and a list of 
errors
+   */
+  ParserRunnerResults<T> execute(String sensorType, RawMessage rawMessage, 
ParserConfigurations parserConfigurations);
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
new file mode 100644
index 0000000..a986db7
--- /dev/null
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.FieldTransformer;
+import org.apache.metron.common.configuration.FieldValidator;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.common.utils.ReflectionUtils;
+import org.apache.metron.parsers.filters.Filters;
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.apache.metron.parsers.topology.ParserComponent;
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * The default implemention of a ParserRunner.
+ */
+public class ParserRunnerImpl implements ParserRunner<JSONObject>, 
Serializable {
+
+  class ProcessResult {
+
+    private JSONObject message;
+    private MetronError error;
+
+    public ProcessResult(JSONObject message) {
+      this.message = message;
+    }
+
+    public ProcessResult(MetronError error) {
+      this.error = error;
+    }
+
+    public JSONObject getMessage() {
+      return message;
+    }
+
+    public MetronError getError() {
+      return error;
+    }
+
+    public boolean isError() {
+      return error != null;
+    }
+  }
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  protected transient Consumer<ParserRunnerResults> onSuccess;
+  protected transient Consumer<MetronError> onError;
+
+  private HashSet<String> sensorTypes;
+  private Map<String, ParserComponent> sensorToParserComponentMap;
+
+  // Stellar variables
+  private transient Context stellarContext;
+
+  public ParserRunnerImpl(HashSet<String> sensorTypes) {
+    this.sensorTypes = sensorTypes;
+  }
+
+  public Map<String, ParserComponent> getSensorToParserComponentMap() {
+    return sensorToParserComponentMap;
+  }
+
+  public void setSensorToParserComponentMap(Map<String, ParserComponent> 
sensorToParserComponentMap) {
+    this.sensorToParserComponentMap = sensorToParserComponentMap;
+  }
+
+  public Context getStellarContext() {
+    return stellarContext;
+  }
+
+  @Override
+  public Set<String> getSensorTypes() {
+    return sensorTypes;
+  }
+
+  @Override
+  public void init(Supplier<ParserConfigurations> parserConfigSupplier, 
Context stellarContext) {
+    if (parserConfigSupplier == null) {
+      throw new IllegalStateException("A parser config supplier must be set 
before initializing the ParserRunner.");
+    }
+    if (stellarContext == null) {
+      throw new IllegalStateException("A stellar context must be set before 
initializing the ParserRunner.");
+    }
+    this.stellarContext = stellarContext;
+    initializeParsers(parserConfigSupplier);
+  }
+
+  /**
+   * Parses messages with the appropriate MessageParser based on sensor type.  
The resulting list of messages are then
+   * post-processed and added to the ParserRunnerResults message list.  Any 
errors that happen during post-processing are
+   * added to the ParserRunnerResults error list.  Any exceptions (including a 
master exception) thrown by the MessageParser
+   * are also added to the ParserRunnerResults error list.
+   *
+   * @param sensorType Sensor type of the message
+   * @param rawMessage Raw message including metadata
+   * @param parserConfigurations Parser configurations
+   * @return ParserRunnerResults containing a list of messages and a list of 
errors
+   */
+  @Override
+  public ParserRunnerResults<JSONObject> execute(String sensorType, RawMessage 
rawMessage, ParserConfigurations parserConfigurations) {
+    DefaultParserRunnerResults parserRunnerResults = new 
DefaultParserRunnerResults();
+    SensorParserConfig sensorParserConfig = 
parserConfigurations.getSensorParserConfig(sensorType);
+    if (sensorParserConfig != null) {
+      MessageParser<JSONObject> parser = 
sensorToParserComponentMap.get(sensorType).getMessageParser();
+      Optional<MessageParserResult<JSONObject>> optionalMessageParserResult = 
parser.parseOptionalResult(rawMessage.getMessage());
+      if (optionalMessageParserResult.isPresent()) {
+        MessageParserResult<JSONObject> messageParserResult = 
optionalMessageParserResult.get();
+
+        // Process each message returned from the MessageParser
+        messageParserResult.getMessages().forEach(message -> {
+                  Optional<ProcessResult> processResult = 
processMessage(sensorType, message, rawMessage, parser, parserConfigurations);
+                  if (processResult.isPresent()) {
+                    if (processResult.get().isError()) {
+                      
parserRunnerResults.addError(processResult.get().getError());
+                    } else {
+                      
parserRunnerResults.addMessage(processResult.get().getMessage());
+                    }
+                  }
+                });
+
+        // If a master exception is thrown by the MessageParser, wrap it with 
a MetronError and add it to the list of errors
+        messageParserResult.getMasterThrowable().ifPresent(throwable -> 
parserRunnerResults.addError(new MetronError()
+                .withErrorType(Constants.ErrorType.PARSER_ERROR)
+                .withThrowable(throwable)
+                .withSensorType(Collections.singleton(sensorType))
+                .addRawMessage(rawMessage.getMessage())));
+
+        // If exceptions are thrown by the MessageParser, wrap them with 
MetronErrors and add them to the list of errors
+        
parserRunnerResults.addErrors(messageParserResult.getMessageThrowables().entrySet().stream().map(entry
 -> new MetronError()
+                .withErrorType(Constants.ErrorType.PARSER_ERROR)
+                .withThrowable(entry.getValue())
+                .withSensorType(Collections.singleton(sensorType))
+                .addRawMessage(entry.getKey())).collect(Collectors.toList()));
+      }
+    } else {
+      throw new IllegalStateException(String.format("Could not execute parser. 
 Cannot find configuration for sensor %s.",
+              sensorType));
+    }
+    return parserRunnerResults;
+  }
+
+  /**
+   * Initializes MessageParsers and MessageFilters for sensor types configured 
in this ParserRunner.  Objects are created
+   * using reflection and the MessageParser configure and init methods are 
called.
+   * @param parserConfigSupplier Parser configurations
+   */
+  private void initializeParsers(Supplier<ParserConfigurations> 
parserConfigSupplier) {
+    LOG.info("Initializing parsers...");
+    sensorToParserComponentMap = new HashMap<>();
+    for(String sensorType: sensorTypes) {
+      if (parserConfigSupplier.get().getSensorParserConfig(sensorType) == 
null) {
+        throw new IllegalStateException(String.format("Could not initialize 
parsers.  Cannot find configuration for sensor %s.",
+                sensorType));
+      }
+
+      SensorParserConfig parserConfig = 
parserConfigSupplier.get().getSensorParserConfig(sensorType);
+
+      LOG.info("Creating parser for sensor {} with parser class = {} and 
filter class = {} ",
+              sensorType, parserConfig.getParserClassName(), 
parserConfig.getFilterClassName());
+
+      // create message parser
+      MessageParser<JSONObject> parser = ReflectionUtils
+              .createInstance(parserConfig.getParserClassName());
+
+      // create message filter
+      MessageFilter<JSONObject> filter = null;
+      parserConfig.getParserConfig().putIfAbsent("stellarContext", 
stellarContext);
+      if (!StringUtils.isEmpty(parserConfig.getFilterClassName())) {
+        filter = Filters.get(
+                parserConfig.getFilterClassName(),
+                parserConfig.getParserConfig()
+        );
+      }
+
+      parser.configure(parserConfig.getParserConfig());
+      parser.init();
+      sensorToParserComponentMap.put(sensorType, new ParserComponent(parser, 
filter));
+    }
+  }
+
+  /**
+   * Post-processes parsed messages by:
+   * <ul>
+   *   <li>Applying field transformations defined in the sensor parser 
config</li>
+   *   <li>Filtering messages using the configured MessageFilter class</li>
+   *   <li>Validating messages using the MessageParser validate method</li>
+   * </ul>
+   * If a message is successfully processed a message is returned in a 
ProcessResult.  If a message fails
+   * validation, a MetronError object is created and returned in a 
ProcessResult.  If a message is
+   * filtered out an empty Optional is returned.
+   *
+   * @param sensorType Sensor type of the message
+   * @param message Message parsed by the MessageParser
+   * @param rawMessage Raw message including metadata
+   * @param parser MessageParser for the sensor type
+   * @param parserConfigurations Parser configurations
+   */
+  @SuppressWarnings("unchecked")
+  protected Optional<ProcessResult> processMessage(String sensorType, 
JSONObject message, RawMessage rawMessage,
+                                                  MessageParser<JSONObject> 
parser,
+                                                  ParserConfigurations 
parserConfigurations
+                                                  ) {
+    Optional<ProcessResult> processResult = Optional.empty();
+    SensorParserConfig sensorParserConfig = 
parserConfigurations.getSensorParserConfig(sensorType);
+    sensorParserConfig.getRawMessageStrategy().mergeMetadata(
+            message,
+            rawMessage.getMetadata(),
+            sensorParserConfig.getMergeMetadata(),
+            sensorParserConfig.getRawMessageStrategyConfig()
+    );
+    message.put(Constants.SENSOR_TYPE, sensorType);
+    applyFieldTransformations(message, rawMessage, sensorParserConfig);
+    if (!message.containsKey(Constants.GUID)) {
+      message.put(Constants.GUID, UUID.randomUUID().toString());
+    }
+    MessageFilter<JSONObject> filter = 
sensorToParserComponentMap.get(sensorType).getFilter();
+    if (filter == null || filter.emit(message, stellarContext)) {
+      boolean isInvalid = !parser.validate(message);
+      List<FieldValidator> failedValidators = null;
+      if (!isInvalid) {
+        failedValidators = getFailedValidators(message, parserConfigurations);
+        isInvalid = !failedValidators.isEmpty();
+      }
+      if (isInvalid) {
+        MetronError error = new MetronError()
+                .withErrorType(Constants.ErrorType.PARSER_INVALID)
+                .withSensorType(Collections.singleton(sensorType))
+                .addRawMessage(message);
+        Set<String> errorFields = failedValidators == null ? null : 
failedValidators.stream()
+                .flatMap(fieldValidator -> fieldValidator.getInput().stream())
+                .collect(Collectors.toSet());
+        if (errorFields != null && !errorFields.isEmpty()) {
+          error.withErrorFields(errorFields);
+        }
+        processResult = Optional.of(new ProcessResult(error));
+      } else {
+        processResult = Optional.of(new ProcessResult(message));
+      }
+    }
+    return processResult;
+  }
+
+  /**
+   * Applies Stellar field transformations defined in the sensor parser config.
+   * @param message Message parsed by the MessageParser
+   * @param rawMessage Raw message including metadata
+   * @param sensorParserConfig Sensor parser config
+   */
+  private void applyFieldTransformations(JSONObject message, RawMessage 
rawMessage, SensorParserConfig sensorParserConfig) {
+    for (FieldTransformer handler : 
sensorParserConfig.getFieldTransformations()) {
+      if (handler != null) {
+        if (!sensorParserConfig.getMergeMetadata()) {
+          //if we haven't merged metadata, then we need to pass them along as 
configuration params.
+          handler.transformAndUpdate(
+                  message,
+                  stellarContext,
+                  sensorParserConfig.getParserConfig(),
+                  rawMessage.getMetadata()
+          );
+        } else {
+          handler.transformAndUpdate(
+                  message,
+                  stellarContext,
+                  sensorParserConfig.getParserConfig()
+          );
+        }
+      }
+    }
+  }
+
+  private List<FieldValidator> getFailedValidators(JSONObject message, 
ParserConfigurations parserConfigurations) {
+    List<FieldValidator> fieldValidations = 
parserConfigurations.getFieldValidations();
+    List<FieldValidator> failedValidators = new ArrayList<>();
+    for(FieldValidator validator : fieldValidations) {
+      if(!validator.isValid(message, parserConfigurations.getGlobalConfig(), 
stellarContext)) {
+        failedValidators.add(validator);
+      }
+    }
+    return failedValidators;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java
new file mode 100644
index 0000000..7ca853c
--- /dev/null
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.apache.metron.common.error.MetronError;
+
+import java.util.List;
+
+/**
+ * Container for the results of parsing a message with a ParserRunner.
+ * @param <T> The type of a successfully parsed message.
+ */
+public interface ParserRunnerResults<T> {
+
+  List<T> getMessages();
+
+  List<MetronError> getErrors();
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 213d02c..a9ee305 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,28 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.parsers.bolt;
 
+package org.apache.metron.parsers.bolt;
 
-import com.github.benmanes.caffeine.cache.Cache;
 import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
 import java.util.function.Function;
-import java.util.stream.Collectors;
-import org.apache.commons.lang3.StringUtils;
+
+import com.github.benmanes.caffeine.cache.Cache;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredParserBolt;
-import org.apache.metron.common.configuration.FieldTransformer;
-import org.apache.metron.common.configuration.FieldValidator;
+import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.error.MetronError;
@@ -45,10 +38,8 @@ import org.apache.metron.common.message.MessageGetters;
 import org.apache.metron.common.message.metadata.RawMessage;
 import org.apache.metron.common.message.metadata.RawMessageUtil;
 import org.apache.metron.common.utils.ErrorUtils;
-import org.apache.metron.parsers.filters.Filters;
-import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.apache.metron.parsers.interfaces.MessageParser;
-import org.apache.metron.parsers.topology.ParserComponents;
+import org.apache.metron.parsers.ParserRunner;
+import org.apache.metron.parsers.ParserRunnerResults;
 import org.apache.metron.stellar.common.CachingStellarProcessor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.StellarFunctions;
@@ -71,26 +62,27 @@ public class ParserBolt extends ConfiguredParserBolt 
implements Serializable {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private OutputCollector collector;
-  private Map<String, ParserComponents> sensorToComponentMap;
+  private ParserRunner<JSONObject> parserRunner;
+  private Map<String, WriterHandler> sensorToWriterMap;
   private Map<String, String> topicToSensorMap = new HashMap<>();
 
-  private Context stellarContext;
   private transient MessageGetStrategy messageGetStrategy;
-  private transient Cache<CachingStellarProcessor.Key, Object> cache;
   private int requestedTickFreqSecs;
   private int defaultBatchTimeout;
   private int batchTimeoutDivisor = 1;
 
   public ParserBolt( String zookeeperUrl
-                   , Map<String, ParserComponents> sensorToComponentMap
+                   , ParserRunner parserRunner
+                     , Map<String, WriterHandler> sensorToWriterMap
   ) {
     super(zookeeperUrl);
-    this.sensorToComponentMap = sensorToComponentMap;
+    this.parserRunner = parserRunner;
+    this.sensorToWriterMap = sensorToWriterMap;
 
     // Ensure that all sensors are either bulk sensors or not bulk sensors.  
Can't mix and match.
     Boolean handleAcks = null;
-    for (Map.Entry<String, ParserComponents> entry : 
sensorToComponentMap.entrySet()) {
-      boolean writerHandleAck = entry.getValue().getWriter().handleAck();
+    for (Map.Entry<String, WriterHandler> entry : 
sensorToWriterMap.entrySet()) {
+      boolean writerHandleAck = entry.getValue().handleAck();
       if (handleAcks == null) {
         handleAcks = writerHandleAck;
       } else if (!handleAcks.equals(writerHandleAck)) {
@@ -126,21 +118,44 @@ public class ParserBolt extends ConfiguredParserBolt 
implements Serializable {
 
   /**
    * Used only for unit testing
-   * @param defaultBatchTimeout
    */
-  protected void setDefaultBatchTimeout(int defaultBatchTimeout) {
-    this.defaultBatchTimeout = defaultBatchTimeout;
+  public int getBatchTimeoutDivisor() {
+    return batchTimeoutDivisor;
+  }
+
+  /**
+   * Used only for unit testing
+   */
+  protected void setSensorToWriterMap(Map<String, WriterHandler> 
sensorToWriterMap) {
+    this.sensorToWriterMap = sensorToWriterMap;
+  }
+
+  /**
+   * Used only for unit testing
+   */
+  protected Map<String, String> getTopicToSensorMap() {
+    return topicToSensorMap;
+  }
+
+  /**
+   * Used only for unit testing
+   */
+  protected void setTopicToSensorMap(Map<String, String> topicToSensorMap) {
+    this.topicToSensorMap = topicToSensorMap;
   }
 
   /**
    * Used only for unit testing
    */
-  public int getDefaultBatchTimeout() {
-    return defaultBatchTimeout;
+  public void setMessageGetStrategy(MessageGetStrategy messageGetStrategy) {
+    this.messageGetStrategy = messageGetStrategy;
   }
 
-  public Map<String, ParserComponents> getSensorToComponentMap() {
-    return sensorToComponentMap;
+  /**
+   * Used only for unit testing
+   */
+  public void setOutputCollector(OutputCollector collector) {
+    this.collector = collector;
   }
 
   /**
@@ -155,7 +170,7 @@ public class ParserBolt extends ConfiguredParserBolt 
implements Serializable {
     // to get the valid WriterConfiguration.  But don't store any 
non-serializable objects,
     // else Storm will throw a runtime error.
     Function<WriterConfiguration, WriterConfiguration> configurationXform;
-    WriterHandler writer = 
sensorToComponentMap.entrySet().iterator().next().getValue().getWriter();
+    WriterHandler writer = 
sensorToWriterMap.entrySet().iterator().next().getValue();
     if (writer.isWriterToBulkWriter()) {
       configurationXform = WriterToBulkWriter.TRANSFORMATION;
     } else {
@@ -185,37 +200,11 @@ public class ParserBolt extends ConfiguredParserBolt 
implements Serializable {
     super.prepare(stormConf, context, collector);
     messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get();
     this.collector = collector;
-
-    // Build the Stellar cache
-    Map<String, Object> cacheConfig = new HashMap<>();
-    for (Map.Entry<String, ParserComponents> entry: 
sensorToComponentMap.entrySet()) {
-      String sensor = entry.getKey();
-      SensorParserConfig config = getSensorParserConfig(sensor);
-
-      if (config != null) {
-        cacheConfig.putAll(config.getCacheConfig());
-      }
-    }
-    cache = CachingStellarProcessor.createCache(cacheConfig);
+    this.parserRunner.init(this::getConfigurations, initializeStellar());
 
     // Need to prep all sensors
-    for (Map.Entry<String, ParserComponents> entry: 
sensorToComponentMap.entrySet()) {
+    for (Map.Entry<String, WriterHandler> entry: sensorToWriterMap.entrySet()) 
{
       String sensor = entry.getKey();
-      MessageParser<JSONObject> parser = entry.getValue().getMessageParser();
-
-      initializeStellar();
-      if (getSensorParserConfig(sensor) != null && 
sensorToComponentMap.get(sensor).getFilter() == null) {
-        
getSensorParserConfig(sensor).getParserConfig().putIfAbsent("stellarContext", 
stellarContext);
-        if 
(!StringUtils.isEmpty(getSensorParserConfig(sensor).getFilterClassName())) {
-          MessageFilter<JSONObject> filter = Filters.get(
-              getSensorParserConfig(sensor).getFilterClassName(),
-              getSensorParserConfig(sensor).getParserConfig()
-          );
-          getSensorToComponentMap().get(sensor).setFilter(filter);
-        }
-      }
-
-      parser.init();
 
       SensorParserConfig config = getSensorParserConfig(sensor);
       if (config != null) {
@@ -225,9 +214,8 @@ public class ParserBolt extends ConfiguredParserBolt 
implements Serializable {
         throw new IllegalStateException(
             "Unable to retrieve a parser config for " + sensor);
       }
-      parser.configure(config.getParserConfig());
 
-      WriterHandler writer = sensorToComponentMap.get(sensor).getWriter();
+      WriterHandler writer = sensorToWriterMap.get(sensor);
       writer.init(stormConf, context, collector, getConfigurations());
       if (defaultBatchTimeout == 0) {
         //This means getComponentConfiguration was never called to initialize 
defaultBatchTimeout,
@@ -242,169 +230,106 @@ public class ParserBolt extends ConfiguredParserBolt 
implements Serializable {
     }
   }
 
-  protected void initializeStellar() {
-    Context.Builder builder = new Context.Builder()
-                                .with(Context.Capabilities.ZOOKEEPER_CLIENT, 
() -> client)
-                                .with(Context.Capabilities.GLOBAL_CONFIG, () 
-> getConfigurations().getGlobalConfig())
-                                .with(Context.Capabilities.STELLAR_CONFIG, () 
-> getConfigurations().getGlobalConfig())
-                                ;
-    if(cache != null) {
-      builder = builder.with(Context.Capabilities.CACHE, () -> cache);
-    }
-    this.stellarContext = builder.build();
-    StellarFunctions.initialize(stellarContext);
-  }
-
 
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
     if (TupleUtils.isTick(tuple)) {
-      try {
-        for (Entry<String, ParserComponents> entry : 
sensorToComponentMap.entrySet()) {
-          entry.getValue().getWriter().flush(getConfigurations(), 
messageGetStrategy);
-        }
-      } catch (Exception e) {
-        throw new RuntimeException(
-            "This should have been caught in the writerHandler.  If you see 
this, file a JIRA", e);
-      } finally {
-        collector.ack(tuple);
-      }
+      handleTickTuple(tuple);
       return;
     }
-
     byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple);
+    String topic = 
tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
+    String sensorType = topicToSensorMap.get(topic);
     try {
-      SensorParserConfig sensorParserConfig;
-      MessageParser<JSONObject> parser;
-      String sensor;
-      Map<String, Object> metadata;
-      if (sensorToComponentMap.size() == 1) {
-        // There's only one parser, so grab info directly
-        Entry<String, ParserComponents> sensorParser = 
sensorToComponentMap.entrySet().iterator()
-            .next();
-        sensor = sensorParser.getKey();
-        parser = sensorParser.getValue().getMessageParser();
-        sensorParserConfig = getSensorParserConfig(sensor);
-      } else {
-        // There's multiple parsers, so pull the topic from the Tuple and look 
up the sensor
-        String topic = 
tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
-        sensor = topicToSensorMap.get(topic);
-        parser = sensorToComponentMap.get(sensor).getMessageParser();
-        sensorParserConfig = getSensorParserConfig(sensor);
-      }
+      ParserConfigurations parserConfigurations = getConfigurations();
+      SensorParserConfig sensorParserConfig = 
parserConfigurations.getSensorParserConfig(sensorType);
+      RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( 
sensorParserConfig.getRawMessageStrategy()
+              , tuple
+              , originalMessage
+              , sensorParserConfig.getReadMetadata()
+              , sensorParserConfig.getRawMessageStrategyConfig()
+      );
+      ParserRunnerResults<JSONObject> parserRunnerResults = 
parserRunner.execute(sensorType, rawMessage, parserConfigurations);
+      long numWritten = parserRunnerResults.getMessages().stream()
+              .map(message -> handleMessage(sensorType, originalMessage, 
tuple, message, collector))
+              .filter(result -> result)
+              .count();
+      parserRunnerResults.getErrors().forEach(error -> 
ErrorUtils.handleError(collector, error));
 
-      List<FieldValidator> fieldValidations = 
getConfigurations().getFieldValidations();
-      boolean ackTuple = false;
-      int numWritten = 0;
-      if (sensorParserConfig != null) {
-        RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( 
sensorParserConfig.getRawMessageStrategy()
-            , tuple
-            , originalMessage
-            , sensorParserConfig.getReadMetadata()
-            , sensorParserConfig.getRawMessageStrategyConfig()
-        );
-        metadata = rawMessage.getMetadata();
-
-        Optional<List<JSONObject>> messages = 
parser.parseOptional(rawMessage.getMessage());
-        for (JSONObject message : messages.orElse(Collections.emptyList())) {
-          //we want to ack the tuple in the situation where we have are not 
doing a bulk write
-          //otherwise we want to defer to the writerComponent who will ack on 
bulk commit.
-          WriterHandler writer = sensorToComponentMap.get(sensor).getWriter();
-          ackTuple = !writer.handleAck();
-
-          sensorParserConfig.getRawMessageStrategy().mergeMetadata(
-              message,
-              metadata,
-              sensorParserConfig.getMergeMetadata(),
-              sensorParserConfig.getRawMessageStrategyConfig()
-          );
-          message.put(Constants.SENSOR_TYPE, sensor);
-
-          for (FieldTransformer handler : 
sensorParserConfig.getFieldTransformations()) {
-            if (handler != null) {
-              if (!sensorParserConfig.getMergeMetadata()) {
-                //if we haven't merged metadata, then we need to pass them 
along as configuration params.
-                handler.transformAndUpdate(
-                    message,
-                    stellarContext,
-                    sensorParserConfig.getParserConfig(),
-                    metadata
-                );
-              } else {
-                handler.transformAndUpdate(
-                    message,
-                    stellarContext,
-                    sensorParserConfig.getParserConfig()
-                );
-              }
-            }
-          }
-          if (!message.containsKey(Constants.GUID)) {
-            message.put(Constants.GUID, UUID.randomUUID().toString());
-          }
-
-          MessageFilter<JSONObject> filter = 
sensorToComponentMap.get(sensor).getFilter();
-          if (filter == null || filter.emitTuple(message, stellarContext)) {
-            boolean isInvalid = !parser.validate(message);
-            List<FieldValidator> failedValidators = null;
-            if (!isInvalid) {
-              failedValidators = getFailedValidators(message, 
fieldValidations);
-              isInvalid = !failedValidators.isEmpty();
-            }
-            if (isInvalid) {
-              MetronError error = new MetronError()
-                  .withErrorType(Constants.ErrorType.PARSER_INVALID)
-                  .withSensorType(Collections.singleton(sensor))
-                  .addRawMessage(message);
-              Set<String> errorFields = failedValidators == null ? null : 
failedValidators.stream()
-                  .flatMap(fieldValidator -> 
fieldValidator.getInput().stream())
-                  .collect(Collectors.toSet());
-              if (errorFields != null && !errorFields.isEmpty()) {
-                error.withErrorFields(errorFields);
-              }
-              ErrorUtils.handleError(collector, error);
-            } else {
-              numWritten++;
-              writer.write(sensor, tuple, message, getConfigurations(), 
messageGetStrategy);
-            }
-          }
-        }
-      }
       //if we are supposed to ack the tuple OR if we've never passed this 
tuple to the bulk writer
       //(meaning that none of the messages are valid either globally or 
locally)
       //then we want to handle the ack ourselves.
-      if (ackTuple || numWritten == 0) {
+      if (!sensorToWriterMap.get(sensorType).handleAck() || numWritten == 0) {
         collector.ack(tuple);
       }
 
     } catch (Throwable ex) {
-      handleError(originalMessage, tuple, ex, collector);
+      handleError(sensorType, originalMessage, tuple, ex, collector);
+      collector.ack(tuple);
+    }
+  }
+
+  protected Context initializeStellar() {
+    Map<String, Object> cacheConfig = new HashMap<>();
+    for (String sensorType: this.parserRunner.getSensorTypes()) {
+      SensorParserConfig config = getSensorParserConfig(sensorType);
+
+      if (config != null) {
+        cacheConfig.putAll(config.getCacheConfig());
+      }
+    }
+    Cache<CachingStellarProcessor.Key, Object> cache = 
CachingStellarProcessor.createCache(cacheConfig);
+
+    Context.Builder builder = new Context.Builder()
+            .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+            .with(Context.Capabilities.GLOBAL_CONFIG, () -> 
getConfigurations().getGlobalConfig())
+            .with(Context.Capabilities.STELLAR_CONFIG, () -> 
getConfigurations().getGlobalConfig())
+            ;
+    if(cache != null) {
+      builder = builder.with(Context.Capabilities.CACHE, () -> cache);
+    }
+    Context stellarContext = builder.build();
+    StellarFunctions.initialize(stellarContext);
+    return stellarContext;
+  }
+
+  protected void handleTickTuple(Tuple tuple) {
+    try {
+      for (Entry<String, WriterHandler> entry : sensorToWriterMap.entrySet()) {
+        entry.getValue().flush(getConfigurations(), messageGetStrategy);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(
+              "This should have been caught in the writerHandler.  If you see 
this, file a JIRA", e);
+    } finally {
+      collector.ack(tuple);
     }
   }
 
-  protected void handleError(byte[] originalMessage, Tuple tuple, Throwable 
ex, OutputCollector collector) {
+  protected boolean handleMessage(String sensorType, byte[] originalMessage, 
Tuple tuple, JSONObject message, OutputCollector collector) {
+    WriterHandler writer = sensorToWriterMap.get(sensorType);
+    try {
+      writer.write(sensorType, tuple, message, getConfigurations(), 
messageGetStrategy);
+      return true;
+    } catch (Exception ex) {
+      handleError(sensorType, originalMessage, tuple, ex, collector);
+      return false;
+    }
+  }
+
+  protected void handleError(String sensorType, byte[] originalMessage, Tuple 
tuple, Throwable ex, OutputCollector collector) {
     MetronError error = new MetronError()
             .withErrorType(Constants.ErrorType.PARSER_ERROR)
             .withThrowable(ex)
-            .withSensorType(sensorToComponentMap.keySet())
+            .withSensorType(Collections.singleton(sensorType))
             .addRawMessage(originalMessage);
     ErrorUtils.handleError(collector, error);
-    collector.ack(tuple);
-  }
-
-  private List<FieldValidator> getFailedValidators(JSONObject input, 
List<FieldValidator> validators) {
-    List<FieldValidator> failedValidators = new ArrayList<>();
-    for(FieldValidator validator : validators) {
-      if(!validator.isValid(input, getConfigurations().getGlobalConfig(), 
stellarContext)) {
-        failedValidators.add(validator);
-      }
-    }
-    return failedValidators;
   }
 
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
     declarer.declareStream(Constants.ERROR_STREAM, new Fields("message"));
   }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
index 9cdafa3..1fa1feb 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
@@ -67,7 +67,7 @@ public class BroMessageFilter implements 
MessageFilter<JSONObject>{
    */
 
   @Override
-  public boolean emitTuple(JSONObject message, Context context) {
+  public boolean emit(JSONObject message, Context context) {
     String protocol = (String) message.get(_key);
     return _known_protocols.contains(protocol);
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
index 15a035a..8300ff4 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
@@ -54,7 +54,7 @@ public class StellarFilter implements 
MessageFilter<JSONObject> {
   }
 
   @Override
-  public boolean emitTuple(JSONObject message, Context context) {
+  public boolean emit(JSONObject message, Context context) {
     VariableResolver resolver = new MapVariableResolver(message);
     return processor.parse(query, resolver, functionResolver, context);
   }

Reply via email to