http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java
new file mode 100644
index 0000000..e24a39d
--- /dev/null
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class MultiLineGrokParserTest {
+
+  /**
+   * Test that if a byte[] with multiple lines of log is passed in
+   * it will be parsed into the correct number of messages.
+   * @throws IOException if we can't read from disk
+   * @throws ParseException if we can't parse
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testLegacyInterfaceReturnsMultiline() throws IOException, 
ParseException {
+
+    Map<String, Object> parserConfig = new HashMap<>();
+    parserConfig.put("grokPath", getGrokPath());
+    parserConfig.put("patternLabel", getGrokPatternLabel());
+    parserConfig.put("timestampField", getTimestampField());
+    parserConfig.put("dateFormat", getDateFormat());
+    parserConfig.put("timeFields", getTimeFields());
+    parserConfig.put("multiLine", getMultiLine());
+    GrokParser grokParser = new GrokParser();
+    grokParser.configure(parserConfig);
+    grokParser.init();
+
+    JSONParser jsonParser = new JSONParser();
+    Map<String, String> testData = getTestData();
+    for (Map.Entry<String, String> e : testData.entrySet()) {
+      byte[] rawMessage = e.getKey().getBytes();
+      Optional<MessageParserResult<JSONObject>> resultOptional = 
grokParser.parseOptionalResult(rawMessage);
+      Assert.assertNotNull(resultOptional);
+      Assert.assertTrue(resultOptional.isPresent());
+      List<JSONObject> parsedList = resultOptional.get().getMessages();
+      Assert.assertEquals(10, parsedList.size());
+    }
+  }
+
+  /**
+   * Test that if a byte[] with multiple lines of log is passed in
+   * it will be parsed into the correct number of messages using the
+   * parseOptionalResult call.
+   * @throws IOException if we can't read from disk
+   * @throws ParseException if we can't parse
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testOptionalResultReturnsMultiline() throws IOException, 
ParseException {
+
+    Map<String, Object> parserConfig = new HashMap<>();
+    parserConfig.put("grokPath", getGrokPath());
+    parserConfig.put("patternLabel", getGrokPatternLabel());
+    parserConfig.put("timestampField", getTimestampField());
+    parserConfig.put("dateFormat", getDateFormat());
+    parserConfig.put("timeFields", getTimeFields());
+    parserConfig.put("multiLine", getMultiLine());
+
+    GrokParser grokParser = new GrokParser();
+    grokParser.configure(parserConfig);
+    grokParser.init();
+
+    JSONParser jsonParser = new JSONParser();
+    Map<String, String> testData = getTestData();
+    for (Map.Entry<String, String> e : testData.entrySet()) {
+      byte[] rawMessage = e.getKey().getBytes();
+      Optional<MessageParserResult<JSONObject>> resultOptional = 
grokParser.parseOptionalResult(rawMessage);
+      Assert.assertTrue(resultOptional.isPresent());
+      Optional<Throwable> throwableOptional = 
resultOptional.get().getMasterThrowable();
+      List<JSONObject>  resultList = resultOptional.get().getMessages();
+      Map<Object,Throwable> errorMap = 
resultOptional.get().getMessageThrowables();
+      Assert.assertFalse(throwableOptional.isPresent());
+      Assert.assertEquals(0, errorMap.size());
+      Assert.assertEquals(10, resultList.size());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public Map getTestData() {
+
+    Map testData = new HashMap<String, String>();
+    String input;
+    try (FileInputStream stream = new FileInputStream(new 
File("src/test/resources/logData/multi_elb_log.txt"))) {
+      input = IOUtils.toString(stream);
+    } catch (IOException ioe) {
+      throw new IllegalStateException("failed to open file", ioe);
+    }
+    // not checking values, just that we get the right number of messages
+    testData.put(input, "");
+    return testData;
+
+  }
+
+  public String getMultiLine() { return "true";}
+
+  public String getGrokPath() {
+    return "../metron-integration-test/src/main/sample/patterns/test";
+  }
+
+  public String getGrokPatternLabel() {
+    return "ELBACCESSLOGS";
+  }
+
+  public List<String> getTimeFields() {
+    return new ArrayList<String>() {{
+      add("timestamp");
+    }};
+  }
+
+  public String getDateFormat() {
+    return "yyyy-MM-dd'T'HH:mm:ss.S'Z'";
+  }
+
+  public String getTimestampField() {
+    return "timestamp";
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineWithErrorsGrokParserTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineWithErrorsGrokParserTest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineWithErrorsGrokParserTest.java
new file mode 100644
index 0000000..8ab8246
--- /dev/null
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineWithErrorsGrokParserTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class MultiLineWithErrorsGrokParserTest {
+
+  /**
+   * Test that if a byte[] with multiple lines of log is passed in
+   * it will be parsed into the correct number of messages.
+   * @throws IOException if we can't read from disk
+   * @throws ParseException if we can't parse
+   */
+  @Test(expected = RuntimeException.class)
+  @SuppressWarnings("unchecked")
+  public void testLegacyInterfaceThrowsOneExceptionWithMultiline() throws 
IOException, ParseException {
+
+    Map<String, Object> parserConfig = new HashMap<>();
+    parserConfig.put("grokPath", getGrokPath());
+    parserConfig.put("patternLabel", getGrokPatternLabel());
+    parserConfig.put("timestampField", getTimestampField());
+    parserConfig.put("dateFormat", getDateFormat());
+    parserConfig.put("timeFields", getTimeFields());
+    parserConfig.put("multiLine",getMultiLine());
+
+    GrokParser grokParser = new GrokParser();
+    grokParser.configure(parserConfig);
+    grokParser.init();
+
+    JSONParser jsonParser = new JSONParser();
+    Map<String, String> testData = getTestData();
+    for (Map.Entry<String, String> e : testData.entrySet()) {
+      byte[] rawMessage = e.getKey().getBytes();
+      List<JSONObject> parsedList = grokParser.parse(rawMessage);
+    }
+  }
+
+  /**
+   * Test that if a byte[] with multiple lines of log is passed in
+   * it will be parsed into the correct number of messages using the
+   * parseOptionalResult call.
+   * @throws IOException if we can't read from disk
+   * @throws ParseException if we can't parse
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testResultInterfaceReturnsErrorsAndMessagesWithMultiline() 
throws IOException, ParseException {
+
+    Map<String, Object> parserConfig = new HashMap<>();
+    parserConfig.put("grokPath", getGrokPath());
+    parserConfig.put("patternLabel", getGrokPatternLabel());
+    parserConfig.put("timestampField", getTimestampField());
+    parserConfig.put("dateFormat", getDateFormat());
+    parserConfig.put("timeFields", getTimeFields());
+    parserConfig.put("multiLine",getMultiLine());
+
+    GrokParser grokParser = new GrokParser();
+    grokParser.configure(parserConfig);
+    grokParser.init();
+
+    JSONParser jsonParser = new JSONParser();
+    Map<String, String> testData = getTestData();
+    for (Map.Entry<String, String> e : testData.entrySet()) {
+      byte[] rawMessage = e.getKey().getBytes();
+      Optional<MessageParserResult<JSONObject>> resultOptional = 
grokParser.parseOptionalResult(rawMessage);
+      Assert.assertTrue(resultOptional.isPresent());
+      Optional<Throwable> throwableOptional = 
resultOptional.get().getMasterThrowable();
+      List<JSONObject>  resultList = resultOptional.get().getMessages();
+      Map<Object,Throwable> errorMap = 
resultOptional.get().getMessageThrowables();
+      Assert.assertFalse(throwableOptional.isPresent());
+      Assert.assertEquals(3, errorMap.size());
+      Assert.assertEquals(10, resultList.size());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public Map getTestData() {
+
+    Map testData = new HashMap<String, String>();
+    String input;
+    try (FileInputStream stream = new FileInputStream(new 
File("src/test/resources/logData/multi_elb_with_errors_log.txt"))) {
+      input = IOUtils.toString(stream);
+    } catch (IOException ioe) {
+      throw new IllegalStateException("failed to open file", ioe);
+    }
+    // not checking values, just that we get the right number of messages
+    testData.put(input, "");
+    return testData;
+
+  }
+
+  public String getGrokPath() {
+    return "../metron-integration-test/src/main/sample/patterns/test";
+  }
+
+  public String getGrokPatternLabel() {
+    return "ELBACCESSLOGS";
+  }
+
+  public List<String> getTimeFields() {
+    return new ArrayList<String>() {{
+      add("timestamp");
+    }};
+  }
+
+  public String getMultiLine() { return "true"; }
+
+  public String getDateFormat() {
+    return "yyyy-MM-dd'T'HH:mm:ss.S'Z'";
+  }
+
+  public String getTimestampField() {
+    return "timestamp";
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java
new file mode 100644
index 0000000..5f05b24
--- /dev/null
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.utils.ReflectionUtils;
+import org.apache.metron.parsers.filters.Filters;
+import org.apache.metron.parsers.filters.StellarFilter;
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.apache.metron.parsers.topology.ParserComponent;
+import org.apache.metron.parsers.ParserRunnerImpl.ProcessResult;
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ParserRunnerImpl.class, ReflectionUtils.class, Filters.class})
+public class ParserRunnerImplTest {
+
+  @Rule
+  public final ExpectedException exception = ExpectedException.none();
+
+  /**
+   {
+   "fieldValidations" : [
+     {
+       "input" : [ "ip_src_addr", "ip_dst_addr"],
+       "validation" : "IP"
+     }
+   ]
+   }
+   */
+  @Multiline
+  private String globalConfigString;
+
+  /**
+   {
+     "parserClassName":"org.apache.metron.parsers.bro.BasicBroParser",
+     "filterClassName":"org.apache.metron.parsers.filters.StellarFilter",
+     "sensorTopic":"bro",
+     "parserConfig": {
+       "field": "value"
+     },
+     "fieldTransformations" : [
+       {
+         "input" : "field1",
+         "transformation" : "REMOVE"
+       }
+     ]
+   }
+   */
+  @Multiline
+  private String broConfigString;
+
+  /**
+   {
+     "parserClassName":"org.apache.metron.parsers.snort.BasicSnortParser",
+     "sensorTopic":"snort",
+     "parserConfig": {}
+   }
+   */
+  @Multiline
+  private String snortConfigString;
+
+  private ParserConfigurations parserConfigurations;
+  private MessageParser<JSONObject> broParser;
+  private MessageParser<JSONObject> snortParser;
+  private MessageFilter<JSONObject> stellarFilter;
+  private ParserRunnerImpl parserRunner;
+
+
+  @Before
+  public void setup() throws IOException {
+    parserConfigurations = new ParserConfigurations();
+    SensorParserConfig broConfig = 
SensorParserConfig.fromBytes(broConfigString.getBytes());
+    SensorParserConfig snortConfig = 
SensorParserConfig.fromBytes(snortConfigString.getBytes());
+    parserConfigurations.updateSensorParserConfig("bro", broConfig);
+    parserConfigurations.updateSensorParserConfig("snort", snortConfig);
+    
parserConfigurations.updateGlobalConfig(JSONUtils.INSTANCE.load(globalConfigString,
 JSONUtils.MAP_SUPPLIER));
+    parserRunner = new ParserRunnerImpl(new HashSet<>(Arrays.asList("bro", 
"snort")));
+    broParser = mock(MessageParser.class);
+    snortParser = mock(MessageParser.class);
+    stellarFilter = mock(StellarFilter.class);
+    mockStatic(ReflectionUtils.class);
+    mockStatic(Filters.class);
+
+    
when(ReflectionUtils.createInstance("org.apache.metron.parsers.bro.BasicBroParser")).thenReturn(broParser);
+    
when(ReflectionUtils.createInstance("org.apache.metron.parsers.snort.BasicSnortParser")).thenReturn(snortParser);
+    when(Filters.get("org.apache.metron.parsers.filters.StellarFilter", 
broConfig.getParserConfig()))
+            .thenReturn(stellarFilter);
+
+  }
+
+  @Test
+  public void shouldThrowExceptionOnEmptyParserSupplier() {
+    exception.expect(IllegalStateException.class);
+    exception.expectMessage("A parser config supplier must be set before 
initializing the ParserRunner.");
+
+    parserRunner.init(null, null);
+  }
+
+  @Test
+  public void shouldThrowExceptionOnEmptyStellarContext() {
+    exception.expect(IllegalStateException.class);
+    exception.expectMessage("A stellar context must be set before initializing 
the ParserRunner.");
+
+    parserRunner.init(() -> parserConfigurations, null);
+  }
+
+  @Test
+  public void initShouldThrowExceptionOnMissingSensorParserConfig() {
+    exception.expect(IllegalStateException.class);
+    exception.expectMessage("Could not initialize parsers.  Cannot find 
configuration for sensor test.");
+
+    parserRunner = new ParserRunnerImpl(new HashSet<String>() {{
+      add("test");
+    }});
+
+    parserRunner.init(() -> parserConfigurations, mock(Context.class));
+  }
+
+  @Test
+  public void executeShouldThrowExceptionOnMissingSensorParserConfig() {
+    exception.expect(IllegalStateException.class);
+    exception.expectMessage("Could not execute parser.  Cannot find 
configuration for sensor test.");
+
+    parserRunner = new ParserRunnerImpl(new HashSet<String>() {{
+      add("test");
+    }});
+
+    parserRunner.execute("test", mock(RawMessage.class), parserConfigurations);
+  }
+
+  @Test
+  public void shouldInit() throws Exception {
+    Context stellarContext = mock(Context.class);
+    Map<String, Object> broParserConfig = 
parserConfigurations.getSensorParserConfig("bro").getParserConfig();
+    Map<String, Object> snortParserConfig = 
parserConfigurations.getSensorParserConfig("snort").getParserConfig();
+
+    parserRunner.init(() -> parserConfigurations, stellarContext);
+
+    {
+      // Verify Stellar context
+      Assert.assertEquals(stellarContext, parserRunner.getStellarContext());
+    }
+
+    Map<String, ParserComponent> sensorToParserComponentMap = 
parserRunner.getSensorToParserComponentMap();
+    {
+      // Verify Bro parser initialization
+      Assert.assertEquals(2, sensorToParserComponentMap.size());
+      ParserComponent broComponent = sensorToParserComponentMap.get("bro");
+      Assert.assertEquals(broParser, broComponent.getMessageParser());
+      Assert.assertEquals(stellarFilter, broComponent.getFilter());
+      verify(broParser, times(1)).init();
+      verify(broParser, times(1)).configure(broParserConfig);
+      verifyNoMoreInteractions(broParser);
+      verifyNoMoreInteractions(stellarFilter);
+    }
+    {
+      // Verify Snort parser initialization
+      ParserComponent snortComponent = sensorToParserComponentMap.get("snort");
+      Assert.assertEquals(snortParser, snortComponent.getMessageParser());
+      Assert.assertNull(snortComponent.getFilter());
+      verify(snortParser, times(1)).init();
+      verify(snortParser, times(1)).configure(snortParserConfig);
+      verifyNoMoreInteractions(snortParser);
+    }
+  }
+
+  @Test
+  public void shouldExecute() {
+    parserRunner = spy(parserRunner);
+    RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new 
HashMap<>());
+    JSONObject parsedMessage1 = new JSONObject();
+    parsedMessage1.put("field", "parsedMessage1");
+    JSONObject parsedMessage2 = new JSONObject();
+    parsedMessage2.put("field", "parsedMessage2");
+    Object rawMessage1 = new RawMessage("raw_message1".getBytes(), new 
HashMap<>());
+    Object rawMessage2 = new RawMessage("raw_message2".getBytes(), new 
HashMap<>());
+    Throwable throwable1 = mock(Throwable.class);
+    Throwable throwable2 = mock(Throwable.class);
+    MessageParserResult<JSONObject> messageParserResult = new 
DefaultMessageParserResult<>(Arrays.asList(parsedMessage1, parsedMessage2),
+            new HashMap<Object, Throwable>(){{
+              put(rawMessage1, throwable1);
+              put(rawMessage2, throwable2);
+            }});
+    JSONObject processedMessage = new JSONObject();
+    processedMessage.put("field", "processedMessage1");
+    MetronError processedError = new 
MetronError().withMessage("processedError");
+    ProcessResult processedMessageResult = mock(ProcessResult.class);
+    ProcessResult processedErrorResult = mock(ProcessResult.class);
+
+    
when(broParser.parseOptionalResult(rawMessage.getMessage())).thenReturn(Optional.of(messageParserResult));
+    when(processedMessageResult.getMessage()).thenReturn(processedMessage);
+    when(processedErrorResult.isError()).thenReturn(true);
+    when(processedErrorResult.getError()).thenReturn(processedError);
+    doReturn(Optional.of(processedMessageResult)).when(parserRunner)
+            .processMessage("bro", parsedMessage1, rawMessage, broParser, 
parserConfigurations);
+    doReturn(Optional.of(processedErrorResult)).when(parserRunner)
+            .processMessage("bro", parsedMessage2, rawMessage, broParser, 
parserConfigurations);
+
+    MetronError expectedParseError1 = new MetronError()
+            .withErrorType(Constants.ErrorType.PARSER_ERROR)
+            .withThrowable(throwable1)
+            .withSensorType(Collections.singleton("bro"))
+            .addRawMessage(rawMessage1);
+    MetronError expectedParseError2 = new MetronError()
+            .withErrorType(Constants.ErrorType.PARSER_ERROR)
+            .withThrowable(throwable2)
+            .withSensorType(Collections.singleton("bro"))
+            .addRawMessage(rawMessage2);
+
+    parserRunner.setSensorToParserComponentMap(new HashMap<String, 
ParserComponent>() {{
+      put("bro", new ParserComponent(broParser, stellarFilter));
+    }});
+    ParserRunnerResults<JSONObject> parserRunnerResults = 
parserRunner.execute("bro", rawMessage, parserConfigurations);
+
+    Assert.assertEquals(1, parserRunnerResults.getMessages().size());
+    
Assert.assertTrue(parserRunnerResults.getMessages().contains(processedMessage));
+    Assert.assertEquals(3, parserRunnerResults.getErrors().size());
+    
Assert.assertTrue(parserRunnerResults.getErrors().contains(processedError));
+    
Assert.assertTrue(parserRunnerResults.getErrors().contains(expectedParseError1));
+    
Assert.assertTrue(parserRunnerResults.getErrors().contains(expectedParseError2));
+  }
+
+  @Test
+  public void shouldExecuteWithMasterThrowable() {
+    parserRunner = spy(parserRunner);
+    RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new 
HashMap<>());
+    Throwable masterThrowable = mock(Throwable.class);
+    MessageParserResult<JSONObject> messageParserResult = new 
DefaultMessageParserResult<>(masterThrowable);
+
+
+    
when(broParser.parseOptionalResult(rawMessage.getMessage())).thenReturn(Optional.of(messageParserResult));
+
+    parserRunner.setSensorToParserComponentMap(new HashMap<String, 
ParserComponent>() {{
+      put("bro", new ParserComponent(broParser, stellarFilter));
+    }});
+    ParserRunnerResults<JSONObject> parserRunnerResults = 
parserRunner.execute("bro", rawMessage, parserConfigurations);
+
+    verify(parserRunner, times(0))
+            .processMessage(any(), any(), any(), any(), any());
+
+    MetronError expectedError = new MetronError()
+            .withErrorType(Constants.ErrorType.PARSER_ERROR)
+            .withThrowable(masterThrowable)
+            .withSensorType(Collections.singleton("bro"))
+            .addRawMessage(rawMessage.getMessage());
+    Assert.assertEquals(1, parserRunnerResults.getErrors().size());
+    Assert.assertTrue(parserRunnerResults.getErrors().contains(expectedError));
+  }
+
+  @Test
+  public void shouldPopulateMessagesOnProcessMessage() {
+    JSONObject inputMessage = new JSONObject();
+    inputMessage.put("guid", "guid");
+    inputMessage.put("ip_src_addr", "192.168.1.1");
+    inputMessage.put("ip_dst_addr", "192.168.1.2");
+    inputMessage.put("field1", "value");
+    RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new 
HashMap<>());
+
+    JSONObject expectedOutput  = new JSONObject();
+    expectedOutput.put("guid", "guid");
+    expectedOutput.put("source.type", "bro");
+    expectedOutput.put("ip_src_addr", "192.168.1.1");
+    expectedOutput.put("ip_dst_addr", "192.168.1.2");
+
+    when(stellarFilter.emit(expectedOutput, 
parserRunner.getStellarContext())).thenReturn(true);
+    when(broParser.validate(expectedOutput)).thenReturn(true);
+
+    parserRunner.setSensorToParserComponentMap(new HashMap<String, 
ParserComponent>() {{
+      put("bro", new ParserComponent(broParser, stellarFilter));
+    }});
+
+    Optional<ParserRunnerImpl.ProcessResult> processResult = 
parserRunner.processMessage("bro", inputMessage, rawMessage, broParser, 
parserConfigurations);
+
+    Assert.assertTrue(processResult.isPresent());
+    Assert.assertFalse(processResult.get().isError());
+    Assert.assertEquals(expectedOutput, processResult.get().getMessage());
+  }
+
+  @Test
+  public void shouldReturnMetronErrorOnInvalidMessage() {
+    JSONObject inputMessage = new JSONObject();
+    inputMessage.put("guid", "guid");
+    RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new 
HashMap<>());
+
+    JSONObject expectedOutput  = new JSONObject();
+    expectedOutput.put("guid", "guid");
+    expectedOutput.put("source.type", "bro");
+    MetronError expectedMetronError = new MetronError()
+            .withErrorType(Constants.ErrorType.PARSER_INVALID)
+            .withSensorType(Collections.singleton("bro"))
+            .addRawMessage(inputMessage);
+
+    when(stellarFilter.emit(expectedOutput, 
parserRunner.getStellarContext())).thenReturn(true);
+    when(broParser.validate(expectedOutput)).thenReturn(false);
+
+    parserRunner.setSensorToParserComponentMap(new HashMap<String, 
ParserComponent>() {{
+      put("bro", new ParserComponent(broParser, stellarFilter));
+    }});
+
+    Optional<ParserRunnerImpl.ProcessResult> processResult = 
parserRunner.processMessage("bro", inputMessage, rawMessage, broParser, 
parserConfigurations);
+
+    Assert.assertTrue(processResult.isPresent());
+    Assert.assertTrue(processResult.get().isError());
+    Assert.assertEquals(expectedMetronError, processResult.get().getError());
+  }
+
+  @Test
+  public void shouldReturnMetronErrorOnFailedFieldValidator() {
+    JSONObject inputMessage = new JSONObject();
+    inputMessage.put("guid", "guid");
+    inputMessage.put("ip_src_addr", "test");
+    inputMessage.put("ip_dst_addr", "test");
+    RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new 
HashMap<>());
+
+    JSONObject expectedOutput  = new JSONObject();
+    expectedOutput.put("guid", "guid");
+    expectedOutput.put("ip_src_addr", "test");
+    expectedOutput.put("ip_dst_addr", "test");
+    expectedOutput.put("source.type", "bro");
+    MetronError expectedMetronError = new MetronError()
+            .withErrorType(Constants.ErrorType.PARSER_INVALID)
+            .withSensorType(Collections.singleton("bro"))
+            .addRawMessage(inputMessage)
+            .withErrorFields(new HashSet<>(Arrays.asList("ip_src_addr", 
"ip_dst_addr")));
+
+    when(stellarFilter.emit(expectedOutput, 
parserRunner.getStellarContext())).thenReturn(true);
+    when(broParser.validate(expectedOutput)).thenReturn(true);
+
+    parserRunner.setSensorToParserComponentMap(new HashMap<String, 
ParserComponent>() {{
+      put("bro", new ParserComponent(broParser, stellarFilter));
+    }});
+
+    Optional<ParserRunnerImpl.ProcessResult> processResult = 
parserRunner.processMessage("bro", inputMessage, rawMessage, broParser, 
parserConfigurations);
+
+    Assert.assertTrue(processResult.isPresent());
+    Assert.assertTrue(processResult.get().isError());
+    Assert.assertEquals(expectedMetronError, processResult.get().getError());
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SampleGrokParserTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SampleGrokParserTest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SampleGrokParserTest.java
index e060559..35e07f8 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SampleGrokParserTest.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SampleGrokParserTest.java
@@ -69,6 +69,9 @@ public class SampleGrokParserTest extends GrokParserTest {
   }
 
   @Override
+  public String getMultiLine() { return "false"; }
+
+  @Override
   public String getGrokPath() {
     return "../metron-integration-test/src/main/sample/patterns/test";
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SquidParserTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SquidParserTest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SquidParserTest.java
index 93c8276..cb424fb 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SquidParserTest.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SquidParserTest.java
@@ -73,6 +73,8 @@ public class SquidParserTest extends GrokParserTest {
 
   }
 
+  @Override
+  public String getMultiLine() { return "false"; }
 
   @Override
   public String getGrokPath() {

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/YafParserTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/YafParserTest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/YafParserTest.java
index 8dd75a0..15ce19f 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/YafParserTest.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/YafParserTest.java
@@ -69,6 +69,9 @@ public class YafParserTest extends GrokParserTest {
   }
 
   @Override
+  public String getMultiLine() { return "false"; }
+
+  @Override
   public String getGrokPath() {
     return "../metron-parsers/src/main/resources/patterns/yaf";
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 06f4cec..9f58d1c 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -17,707 +17,428 @@
  */
 package org.apache.metron.parsers.bolt;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.IndexingConfigurations;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
-import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.writer.BulkMessageWriter;
-import org.apache.metron.common.writer.BulkWriterResponse;
-import org.apache.metron.common.writer.MessageWriter;
-import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
-import org.apache.metron.parsers.BasicParser;
-import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.apache.metron.parsers.interfaces.MessageParser;
-import org.apache.metron.parsers.topology.ParserComponents;
+import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.parsers.DefaultParserRunnerResults;
+import org.apache.metron.parsers.ParserRunnerResults;
+import org.apache.metron.parsers.ParserRunnerImpl;
 import org.apache.metron.stellar.dsl.Context;
+import 
org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration;
 import org.apache.metron.test.bolt.BaseBoltTest;
 import org.apache.metron.test.error.MetronErrorJSONMatcher;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
+import org.apache.storm.Config;
 import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.mockito.Mock;
 
-public class ParserBoltTest extends BaseBoltTest {
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.function.Supplier;
 
-  @Mock
-  private MessageParser<JSONObject> parser;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-  @Mock
-  private MessageWriter<JSONObject> writer;
+public class ParserBoltTest extends BaseBoltTest {
 
-  @Mock
-  private BulkMessageWriter<JSONObject> batchWriter;
+  @Rule
+  public final ExpectedException exception = ExpectedException.none();
 
   @Mock
-  private MessageFilter<JSONObject> filter;
+  private Tuple t1;
 
   @Mock
-  private Tuple t1;
+  private ParserRunnerImpl parserRunner;
 
   @Mock
-  private Tuple t2;
+  private WriterHandler writerHandler;
 
   @Mock
-  private Tuple t3;
+  private WriterHandler writerHandlerHandleAck;
 
   @Mock
-  private Tuple t4;
+  private MessageGetStrategy messageGetStrategy;
 
   @Mock
-  private Tuple t5;
+  private Context stellarContext;
 
-  private static class RecordingWriter implements 
BulkMessageWriter<JSONObject> {
-    List<JSONObject> records = new ArrayList<>();
+  private class MockParserRunner extends ParserRunnerImpl {
 
-    @Override
-    public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration config) throws Exception {
+    private boolean isInvalid = false;
+    private RawMessage rawMessage;
+    private JSONObject message;
 
+    public MockParserRunner(HashSet<String> sensorTypes) {
+      super(sensorTypes);
     }
 
     @Override
-    public BulkWriterResponse write(String sensorType, WriterConfiguration 
configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws 
Exception {
-      records.addAll(messages);
-      BulkWriterResponse ret = new BulkWriterResponse();
-      ret.addAllSuccesses(tuples);
-      return ret;
+    public ParserRunnerResults<JSONObject> execute(String sensorType, 
RawMessage rawMessage, ParserConfigurations parserConfigurations) {
+      DefaultParserRunnerResults parserRunnerResults = new 
DefaultParserRunnerResults();
+      this.rawMessage = rawMessage;
+      if (!isInvalid) {
+        parserRunnerResults.addMessage(message);
+      } else {
+        MetronError error = new MetronError()
+                .withErrorType(Constants.ErrorType.PARSER_INVALID)
+                .withSensorType(Collections.singleton(sensorType))
+                .addRawMessage(message);
+        parserRunnerResults.addError(error);
+      }
+      return parserRunnerResults;
     }
 
-    @Override
-    public String getName() {
-      return "recording";
+    protected void setInvalid(boolean isInvalid) {
+      this.isInvalid = isInvalid;
     }
 
-    @Override
-    public void close() throws Exception {
-
+    protected void setMessage(JSONObject message) {
+      this.message = message;
     }
 
-    public List<JSONObject> getRecords() {
-      return records;
+    protected RawMessage getRawMessage() {
+      return rawMessage;
     }
   }
 
-  private static ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-    return createUpdater(Optional.empty());
-  }
-  private static ConfigurationsUpdater<ParserConfigurations> 
createUpdater(Optional<Integer> batchSize) {
-    return new ConfigurationsUpdater<ParserConfigurations>(null, null) {
-      @Override
-      public void update(CuratorFramework client, String path, byte[] data) 
throws IOException { }
-
-      @Override
-      public void delete(CuratorFramework client, String path, byte[] data) 
throws IOException { }
+  @Before
+  public void setup() {
+    when(writerHandler.handleAck()).thenReturn(false);
+    when(writerHandlerHandleAck.handleAck()).thenReturn(true);
 
-      @Override
-      public ConfigurationType getType() {
-        return ConfigurationType.PARSER;
-      }
-
-      @Override
-      public void update(String name, byte[] data) throws IOException { }
-
-      @Override
-      public void delete(String name) { }
-
-      @Override
-      public Class<ParserConfigurations> getConfigurationClass() {
-        return ParserConfigurations.class;
-      }
-
-      @Override
-      public void forceUpdate(CuratorFramework client) { }
-
-      @Override
-      public ParserConfigurations defaultConfigurations() {
-        return new ParserConfigurations() {
-          @Override
-          public SensorParserConfig getSensorParserConfig(String sensorType) {
-            return new SensorParserConfig() {
-              @Override
-              public Map<String, Object> getParserConfig() {
-                return new HashMap<String, Object>() {{
-                  if(batchSize.isPresent()) {
-                    put(IndexingConfigurations.BATCH_SIZE_CONF, 
batchSize.get());
-                  }
-                }};
-              }
-            };
-          }
-        };
-      }
-    };
   }
 
-
   @Test
-  public void testEmpty() throws Exception {
-    String sensorType = "yaf";
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            parser,
-            null,
-            new WriterHandler(writer)
-        )
-    );
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
-      @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater();
-      }
-    };
-
-    parserBolt.setCuratorFramework(client);
-    parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(parser, times(1)).init();
-    verify(writer, times(1)).init();
-    byte[] sampleBinary = "some binary message".getBytes();
-
-    when(tuple.getBinary(0)).thenReturn(sampleBinary);
-    when(parser.parseOptional(sampleBinary)).thenReturn(null);
-    parserBolt.execute(tuple);
-    verify(parser, times(0)).validate(any());
-    verify(writer, times(0)).write(eq(sensorType), 
any(ParserWriterConfiguration.class), eq(tuple), any());
-    verify(outputCollector, times(1)).ack(tuple);
-
-    MetronError error = new MetronError()
-            .withErrorType(Constants.ErrorType.PARSER_ERROR)
-            .withThrowable(new NullPointerException())
-            .withSensorType(Collections.singleton(sensorType))
-            .addRawMessage(sampleBinary);
-    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), 
argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
+  public void shouldThrowExceptionOnDifferentHandleAck() {
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage("All writers must match when calling handleAck()");
+
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new 
HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+      put("bro", writerHandlerHandleAck);
+    }});
   }
 
   @Test
-  public void testInvalid() throws Exception {
-    String sensorType = "yaf";
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            parser,
-            null,
-            new WriterHandler(writer)
-        )
-    );
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
-      @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater();
-      }
-    };
+  public void withBatchTimeoutDivisorShouldSetBatchTimeoutDivisor() {
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new 
HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }}).withBatchTimeoutDivisor(5);
 
-    buildGlobalConfig(parserBolt);
+    Assert.assertEquals(5, parserBolt.getBatchTimeoutDivisor());
+  }
 
-    parserBolt.setCuratorFramework(client);
-    parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    byte[] sampleBinary = "some binary message".getBytes();
-
-    when(tuple.getBinary(0)).thenReturn(sampleBinary);
-    JSONObject parsedMessage = new JSONObject();
-    parsedMessage.put("field", "invalidValue");
-    parsedMessage.put("guid", "this-is-unique-identifier-for-tuple");
-    List<JSONObject> messageList = new ArrayList<>();
-    messageList.add(parsedMessage);
-    
when(parser.parseOptional(sampleBinary)).thenReturn(Optional.of(messageList));
-    when(parser.validate(parsedMessage)).thenReturn(true);
-    parserBolt.execute(tuple);
+  @Test
+  public void shouldThrowExceptionOnInvalidBatchTimeoutDivisor() {
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage("batchTimeoutDivisor must be positive. Value 
provided was -1");
 
-    MetronError error = new MetronError()
-            .withErrorType(Constants.ErrorType.PARSER_INVALID)
-            .withSensorType(Collections.singleton(sensorType))
-            .withErrorFields(new HashSet<String>() {{ add("field"); }})
-            .addRawMessage(new JSONObject(){{
-              put("field", "invalidValue");
-              put("source.type", "yaf");
-              put("guid", "this-is-unique-identifier-for-tuple");
-            }});
-    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), 
argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new 
HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }}).withBatchTimeoutDivisor(-1);
   }
 
   @Test
-  public void test() throws Exception {
-    String sensorType = "yaf";
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            parser,
-            null,
-            new WriterHandler(writer)
-        )
-    );
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
+  public void shouldGetComponentConfiguration() {
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new 
HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }}) {
+
       @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater();
+      public ParserConfigurations getConfigurations() {
+        ParserConfigurations configurations = new ParserConfigurations();
+        SensorParserConfig sensorParserConfig = new SensorParserConfig();
+        sensorParserConfig.setParserConfig(new HashMap<String, Object>() {{
+            put(IndexingConfigurations.BATCH_SIZE_CONF, 10);
+        }});
+        configurations.updateSensorParserConfig("yaf", sensorParserConfig);
+        return configurations;
       }
     };
-    parserBolt.setCuratorFramework(client);
-    parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(parser, times(1)).init();
-    verify(writer, times(1)).init();
-    byte[] sampleBinary = "some binary message".getBytes();
-    JSONParser jsonParser = new JSONParser();
-    final JSONObject sampleMessage1 = (JSONObject) jsonParser.parse("{ 
\"field1\":\"value1\", \"guid\": \"this-is-unique-identifier-for-tuple\" }");
-    final JSONObject sampleMessage2 = (JSONObject) jsonParser.parse("{ 
\"field2\":\"value2\", \"guid\": \"this-is-unique-identifier-for-tuple\" }");
-    List<JSONObject> messages = new ArrayList<JSONObject>() {{
-      add(sampleMessage1);
-      add(sampleMessage2);
-    }};
-    final JSONObject finalMessage1 = (JSONObject) jsonParser.parse("{ 
\"field1\":\"value1\", \"source.type\":\"" + sensorType + "\", \"guid\": 
\"this-is-unique-identifier-for-tuple\" }");
-    final JSONObject finalMessage2 = (JSONObject) jsonParser.parse("{ 
\"field2\":\"value2\", \"source.type\":\"" + sensorType + "\", \"guid\": 
\"this-is-unique-identifier-for-tuple\" }");
-    when(tuple.getBinary(0)).thenReturn(sampleBinary);
-    when(parser.parseOptional(sampleBinary)).thenReturn(Optional.of(messages));
-    when(parser.validate(eq(messages.get(0)))).thenReturn(true);
-    when(parser.validate(eq(messages.get(1)))).thenReturn(false);
-    parserBolt.execute(tuple);
-    verify(writer, times(1)).write(eq(sensorType), 
any(ParserWriterConfiguration.class), eq(tuple), eq(finalMessage1));
-    verify(outputCollector, times(1)).ack(tuple);
-    when(parser.validate(eq(messages.get(0)))).thenReturn(true);
-    when(parser.validate(eq(messages.get(1)))).thenReturn(true);
-    when(filter.emitTuple(eq(messages.get(0)), any())).thenReturn(false);
-    when(filter.emitTuple(eq(messages.get(1)), any())).thenReturn(true);
-    parserBolt.execute(tuple);
-    verify(writer, times(1)).write(eq(sensorType), 
any(ParserWriterConfiguration.class), eq(tuple), eq(finalMessage2));
-    verify(outputCollector, times(2)).ack(tuple);
-    doThrow(new Exception()).when(writer).write(eq(sensorType), 
any(ParserWriterConfiguration.class), eq(tuple), eq(finalMessage2));
-    parserBolt.execute(tuple);
-    verify(outputCollector, times(1)).reportError(any(Throwable.class));
-  }
-
-  /**
-   {
-    "filterClassName" : "STELLAR"
-   ,"parserConfig" : {
-    "filter.query" : "exists(field1)"
-    }
-   }
-   */
-  @Multiline
-  public static String sensorParserConfig;
-
-  /**
-   * Tests to ensure that a message that is unfiltered results in one write 
and an ack.
-   * @throws Exception
-   */
-  @Test
-  public void testFilterSuccess() throws Exception {
-    String sensorType = "yaf";
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            parser,
-            null,
-            new WriterHandler(batchWriter)
-        )
-    );
-    ParserBolt parserBolt = buildParserBolt(parserMap, sensorParserConfig);
 
-    parserBolt.setCuratorFramework(client);
-    parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(parser, times(1)).init();
-    verify(batchWriter, times(1)).init(any(), any(), any());
-    BulkWriterResponse successResponse = mock(BulkWriterResponse.class);
-    when(successResponse.getSuccesses()).thenReturn(ImmutableList.of(t1));
-    when(batchWriter.write(any(), any(), any(), 
any())).thenReturn(successResponse);
-    when(parser.validate(any())).thenReturn(true);
-    
when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new 
JSONObject(new HashMap<String, Object>() {{
-      put("field1", "blah");
-    }}))));
-    parserBolt.execute(t1);
-    verify(batchWriter, times(1)).write(any(), any(), any(), any());
-    verify(outputCollector, times(1)).ack(t1);
+    Map<String, Object> componentConfiguration = 
parserBolt.getComponentConfiguration();
+    Assert.assertEquals(1, componentConfiguration.size());
+    Assert.assertEquals( 14, 
componentConfiguration.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS));
   }
 
-
-  /**
-   * Tests to ensure that a message filtered out results in no writes, but an 
ack.
-   * @throws Exception
-   */
   @Test
-  public void testFilterFailure() throws Exception {
-    String sensorType = "yaf";
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            parser,
-            null,
-            new WriterHandler(batchWriter)
-        )
-    );
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
+  public void shouldPrepare() {
+    Map stormConf = mock(Map.class);
+    SensorParserConfig yafConfig = mock(SensorParserConfig.class);
+    when(yafConfig.getSensorTopic()).thenReturn("yafTopic");
+    when(yafConfig.getParserConfig()).thenReturn(new HashMap<String, Object>() 
{{
+      put(IndexingConfigurations.BATCH_SIZE_CONF, 10);
+    }});
+    ParserConfigurations parserConfigurations = 
mock(ParserConfigurations.class);
+
+    ParserBolt parserBolt = spy(new ParserBolt("zookeeperUrl", parserRunner, 
new HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }}) {
+
       @Override
       protected SensorParserConfig getSensorParserConfig(String sensorType) {
-        try {
-          return 
SensorParserConfig.fromBytes(Bytes.toBytes(sensorParserConfig));
-        } catch (IOException e) {
-          throw new RuntimeException(e);
+        if ("yaf".equals(sensorType)) {
+          return yafConfig;
         }
+        return null;
       }
 
       @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater();
+      public ParserConfigurations getConfigurations() {
+        return parserConfigurations;
       }
-    };
+    });
+    doReturn(stellarContext).when(parserBolt).initializeStellar();
 
     parserBolt.setCuratorFramework(client);
     parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(parser, times(1)).init();
-    verify(batchWriter, times(1)).init(any(), any(), any());
-    when(parser.validate(any())).thenReturn(true);
-    
when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new 
JSONObject(new HashMap<String, Object>() {{
-      put("field2", "blah");
-    }}))));
-    parserBolt.execute(t1);
-    verify(batchWriter, times(0)).write(any(), any(), any(), any());
-    verify(outputCollector, times(1)).ack(t1);
-  }
-  /**
-  {
-     "sensorTopic":"dummy"
-     ,"parserConfig": {
-      "batchSize" : 1
-     }
-      ,"fieldTransformations" : [
-          {
-           "transformation" : "STELLAR"
-          ,"output" : "timestamp"
-          ,"config" : {
-            "timestamp" : "TO_EPOCH_TIMESTAMP(timestampstr, 'yyyy-MM-dd 
HH:mm:ss', 'UTC')"
-                      }
-          }
-                               ]
-   }
-   */
-  @Multiline
-  public static String csvWithFieldTransformations;
 
-  @Test
-  public void testFieldTransformationPriorToValidation() {
-    String sensorType = "dummy";
-    RecordingWriter recordingWriter = new RecordingWriter();
-    //create a parser which acts like a basic parser but returns no timestamp 
field.
-    BasicParser dummyParser = new BasicParser() {
-      @Override
-      public void init() {
+    parserBolt.prepare(stormConf, topologyContext, outputCollector);
 
-      }
+    verify(parserRunner, times(1)).init(any(Supplier.class), 
eq(stellarContext));
+    verify(yafConfig, times(1)).init();
+    Map<String, String> topicToSensorMap = parserBolt.getTopicToSensorMap();
+    Assert.assertEquals(1, topicToSensorMap.size());
+    Assert.assertEquals("yaf", topicToSensorMap.get("yafTopic"));
+    verify(writerHandler).init(stormConf, topologyContext, outputCollector, 
parserConfigurations);
+    verify(writerHandler).setDefaultBatchTimeout(14);
+  }
 
-      @Override
-      public List<JSONObject> parse(byte[] rawMessage) {
-        return ImmutableList.of(new JSONObject() {{
-                put("data", "foo");
-                put("timestampstr", "2016-01-05 17:02:30");
-                put("original_string", "blah");
-              }});
-      }
+  @Test
+  public void shouldThrowExceptionOnMissingConfig() {
+    exception.expect(IllegalStateException.class);
+    exception.expectMessage("Unable to retrieve a parser config for yaf");
 
-      @Override
-      public void configure(Map<String, Object> config) {
+    Map stormConf = mock(Map.class);
 
-      }
-    };
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            dummyParser,
-            null,
-            new WriterHandler(recordingWriter)
-        )
-    );
-    ParserBolt parserBolt = buildParserBolt(parserMap, 
csvWithFieldTransformations);
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new 
HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }});
 
     parserBolt.setCuratorFramework(client);
     parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    when(t1.getBinary(0)).thenReturn(new byte[] {});
-    parserBolt.execute(t1);
-    Assert.assertEquals(1, recordingWriter.getRecords().size());
-    long expected = 1452013350000L;
-    Assert.assertEquals(expected, 
recordingWriter.getRecords().get(0).get("timestamp"));
+
+    parserBolt.prepare(stormConf, topologyContext, outputCollector);
   }
 
+
   @Test
-  public void testDefaultBatchSize() throws Exception {
-    String sensorType = "yaf";
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            parser,
-            filter,
-            new WriterHandler(batchWriter)
-        )
-    );
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
+  public void executeShouldHandleTickTuple() throws Exception {
+    when(t1.getSourceComponent()).thenReturn("__system");
+    when(t1.getSourceStreamId()).thenReturn("__tick");
+    ParserConfigurations parserConfigurations = 
mock(ParserConfigurations.class);
+
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new 
HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }}) {
+
       @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        // this uses default batch size
-        return ParserBoltTest.createUpdater();
+      public ParserConfigurations getConfigurations() {
+        return parserConfigurations;
       }
     };
 
-    parserBolt.setCuratorFramework(client);
-    parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(parser, times(1)).init();
-    verify(batchWriter, times(1)).init(any(), any(), any());
-    when(parser.validate(any())).thenReturn(true);
-    
when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new 
JSONObject())));
-    when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
-    BulkWriterResponse response = new BulkWriterResponse();
-    Tuple[] uniqueTuples = new 
Tuple[ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE];
-    for (int i=0; i < uniqueTuples.length; i++) {
-      uniqueTuples[i] = mock(Tuple.class);
-      response.addSuccess(uniqueTuples[i]);
-    }
-    when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), 
eq(new HashSet<>(Arrays.asList(uniqueTuples))), any())).thenReturn(response);
-    for (Tuple tuple : uniqueTuples) {
-      parserBolt.execute(tuple);
-    }
-    for (Tuple uniqueTuple : uniqueTuples) {
-      verify(outputCollector, times(1)).ack(uniqueTuple);
-    }
+    parserBolt.setMessageGetStrategy(messageGetStrategy);
+    parserBolt.setOutputCollector(outputCollector);
+
+    parserBolt.execute(t1);
+
+    verify(writerHandler, times(1)).flush(parserConfigurations, 
messageGetStrategy);
+    verify(outputCollector, times(1)).ack(t1);
   }
 
   @Test
-  public void testLessRecordsThanDefaultBatchSize() throws Exception {
-    String sensorType = "yaf";
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            parser,
-            filter,
-            new WriterHandler(batchWriter)
-        )
-    );
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
+  public void shouldExecuteOnSuccess() throws Exception {
+    
when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8));
+    
when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic");
+    MockParserRunner mockParserRunner = new MockParserRunner(new 
HashSet<String>() {{ add("yaf"); }});
+    ParserConfigurations parserConfigurations = new ParserConfigurations();
+    parserConfigurations.updateSensorParserConfig("yaf", new 
SensorParserConfig());
+
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", mockParserRunner, 
new HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }}) {
+
       @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        // this uses default batch size
-        return ParserBoltTest.createUpdater();
+      public ParserConfigurations getConfigurations() {
+        return parserConfigurations;
       }
     };
 
-    parserBolt.setCuratorFramework(client);
-    parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(parser, times(1)).init();
-    verify(batchWriter, times(1)).init(any(), any(), any());
-    when(parser.validate(any())).thenReturn(true);
-    
when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new 
JSONObject())));
-    when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
-    int oneLessThanDefaultBatchSize = 
ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE - 1;
-    BulkWriterResponse response = new BulkWriterResponse();
-    Tuple[] uniqueTuples = new Tuple[oneLessThanDefaultBatchSize];
-    for (int i=0; i < uniqueTuples.length; i++) {
-      uniqueTuples[i] = mock(Tuple.class);
-      response.addSuccess(uniqueTuples[i]);
+    parserBolt.setMessageGetStrategy(messageGetStrategy);
+    parserBolt.setOutputCollector(outputCollector);
+    parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{
+      put("yafTopic", "yaf");
+    }});
+    JSONObject message = new JSONObject();
+    message.put("field", "value");
+    mockParserRunner.setMessage(message);
+    RawMessage expectedRawMessage = new 
RawMessage("originalMessage".getBytes(StandardCharsets.UTF_8), new HashMap<>());
+
+    {
+      // Verify the correct message is written and ack is handled
+      parserBolt.execute(t1);
+
+      Assert.assertEquals(expectedRawMessage, 
mockParserRunner.getRawMessage());
+      verify(writerHandler, times(1)).write("yaf", t1, message, 
parserConfigurations, messageGetStrategy);
+      verify(outputCollector, times(1)).ack(t1);
     }
-    for (Tuple tuple : uniqueTuples) {
-      parserBolt.execute(tuple);
+    {
+      // Verify the tuple is not acked when the writer is set to handle ack
+      reset(outputCollector);
+      parserBolt.setSensorToWriterMap(new HashMap<String, WriterHandler>() {{
+        put("yaf", writerHandlerHandleAck);
+      }});
+
+      parserBolt.execute(t1);
+
+      verify(writerHandlerHandleAck, times(1)).write("yaf", t1, message, 
parserConfigurations, messageGetStrategy);
+      verify(outputCollector, times(0)).ack(t1);
     }
-    // should have no acking yet - batch size not fulfilled
-    verify(outputCollector, never()).ack(any(Tuple.class));
-    response.addSuccess(t1); // used to achieve count in final verify
-    Iterable<Tuple> tuples = new HashSet(Arrays.asList(uniqueTuples)) {{
-      add(t1);
-    }};
-    when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), 
eq(tuples), any())).thenReturn(response);
-    // meet batch size requirement and now it should ack
-    parserBolt.execute(t1);
-    verify(outputCollector, 
times(ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE)).ack(any(Tuple.class));
   }
 
   @Test
-  public void testBatchOfOne() throws Exception {
-    String sensorType = "yaf";
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            parser,
-            filter,
-            new WriterHandler(batchWriter)
-        )
-    );
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
+  public void shouldExecuteOnError() throws Exception {
+    
when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8));
+    
when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic");
+    MockParserRunner mockParserRunner = new MockParserRunner(new 
HashSet<String>() {{
+      add("yaf");
+    }});
+    mockParserRunner.setInvalid(true);
+    ParserConfigurations parserConfigurations = new ParserConfigurations();
+    parserConfigurations.updateSensorParserConfig("yaf", new 
SensorParserConfig());
+
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", mockParserRunner, 
new HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }}) {
+
       @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater(Optional.of(1));
+      public ParserConfigurations getConfigurations() {
+        return parserConfigurations;
       }
     };
 
-    parserBolt.setCuratorFramework(client);
-    parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(parser, times(1)).init();
-    verify(batchWriter, times(1)).init(any(), any(), any());
-    when(parser.validate(any())).thenReturn(true);
-    
when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new 
JSONObject())));
-    when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
-    BulkWriterResponse response = new BulkWriterResponse();
-    response.addSuccess(t1);
-    when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), 
eq(Collections.singleton(t1)), any())).thenReturn(response);
+    parserBolt.setMessageGetStrategy(messageGetStrategy);
+    parserBolt.setOutputCollector(outputCollector);
+    parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{
+      put("yafTopic", "yaf");
+    }});
+    JSONObject message = new JSONObject();
+    message.put("field", "value");
+    mockParserRunner.setMessage(message);
+    RawMessage expectedRawMessage = new 
RawMessage("originalMessage".getBytes(StandardCharsets.UTF_8), new HashMap<>());
+    MetronError error = new MetronError()
+            .withErrorType(Constants.ErrorType.PARSER_INVALID)
+            .withSensorType(Collections.singleton("yaf"))
+            .addRawMessage(message);
+
     parserBolt.execute(t1);
+
+    Assert.assertEquals(expectedRawMessage, mockParserRunner.getRawMessage());
+    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM),
+            argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
     verify(outputCollector, times(1)).ack(t1);
+
   }
 
   @Test
-  public void testBatchOfFive() throws Exception {
-    String sensorType = "yaf";
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            parser,
-            filter,
-            new WriterHandler(batchWriter)
-        )
-    );
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
-      @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater(Optional.of(5));
-      }
-    } ;
-
-    parserBolt.setCuratorFramework(client);
-    parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(parser, times(1)).init();
-    verify(batchWriter, times(1)).init(any(), any(), any());
-    when(parser.validate(any())).thenReturn(true);
-    
when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new 
JSONObject())));
-    when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
-    Set<Tuple> tuples = Stream.of(t1, t2, t3, t4, 
t5).collect(Collectors.toSet());
-    BulkWriterResponse response = new BulkWriterResponse();
-    response.addAllSuccesses(tuples);
-    when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), 
eq(tuples), any())).thenReturn(response);
-    writeNonBatch(outputCollector, parserBolt, t1);
-    writeNonBatch(outputCollector, parserBolt, t2);
-    writeNonBatch(outputCollector, parserBolt, t3);
-    writeNonBatch(outputCollector, parserBolt, t4);
-    parserBolt.execute(t5);
-    verify(batchWriter, times(1)).write(eq(sensorType), 
any(WriterConfiguration.class), eq(tuples), any());
-    verify(outputCollector, times(1)).ack(t1);
-    verify(outputCollector, times(1)).ack(t2);
-    verify(outputCollector, times(1)).ack(t3);
-    verify(outputCollector, times(1)).ack(t4);
-    verify(outputCollector, times(1)).ack(t5);
+  public void shouldThrowExceptionOnFailedExecute() {
+    
when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8));
+    
when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic");
 
+    ParserConfigurations parserConfigurations = new ParserConfigurations();
+    parserConfigurations.updateSensorParserConfig("yaf", new 
SensorParserConfig());
+    doThrow(new IllegalStateException("parserRunner.execute 
failed")).when(parserRunner).execute(eq("yaf"), any(), 
eq(parserConfigurations));
 
-  }
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new 
HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }}) {
 
-  @Test
-  public void testBatchOfFiveWithError() throws Exception {
-    String sensorType = "yaf";
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            parser,
-            filter,
-            new WriterHandler(batchWriter)
-        )
-    );
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
       @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater(Optional.of(5));
+      public ParserConfigurations getConfigurations() {
+        return parserConfigurations;
       }
     };
 
-    parserBolt.setCuratorFramework(client);
-    parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(parser, times(1)).init();
-    verify(batchWriter, times(1)).init(any(), any(), any());
-
-    doThrow(new Exception()).when(batchWriter).write(any(), any(), any(), 
any());
-    when(parser.validate(any())).thenReturn(true);
-    
when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new 
JSONObject())));
-    when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
-    parserBolt.execute(t1);
-    parserBolt.execute(t2);
-    parserBolt.execute(t3);
-    parserBolt.execute(t4);
-    parserBolt.execute(t5);
-    verify(batchWriter, times(1)).write(any(), any(), any(), any());
-    verify(outputCollector, times(1)).ack(t1);
-    verify(outputCollector, times(1)).ack(t2);
-    verify(outputCollector, times(1)).ack(t3);
-    verify(outputCollector, times(1)).ack(t4);
-    verify(outputCollector, times(1)).ack(t5);
+    parserBolt.setMessageGetStrategy(messageGetStrategy);
+    parserBolt.setOutputCollector(outputCollector);
+    parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{
+      put("yafTopic", "yaf");
+    }});
+    MetronError error = new MetronError()
+            .withErrorType(Constants.ErrorType.PARSER_ERROR)
+            .withThrowable(new IllegalStateException("parserRunner.execute 
failed"))
+            .withSensorType(Collections.singleton("yaf"))
+            .addRawMessage("originalMessage".getBytes(StandardCharsets.UTF_8));
 
-  }
+    parserBolt.execute(t1);
 
-  protected void buildGlobalConfig(ParserBolt parserBolt) {
-    HashMap<String, Object> globalConfig = new HashMap<>();
-    Map<String, Object> fieldValidation = new HashMap<>();
-    fieldValidation.put("input", Arrays.asList("field"));
-    fieldValidation.put("validation", "STELLAR");
-    fieldValidation.put("config", new HashMap<String, String>(){{ 
put("condition", "field != 'invalidValue'"); }});
-    globalConfig.put("fieldValidations", Arrays.asList(fieldValidation));
-    parserBolt.getConfigurations().updateGlobalConfig(globalConfig);
+    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM),
+            argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
+    verify(outputCollector, 
times(1)).reportError(any(IllegalStateException.class));
+    verify(outputCollector, times(1)).ack(t1);
   }
 
-  private ParserBolt buildParserBolt(Map<String, ParserComponents> parserMap,
-      String csvWithFieldTransformations) {
-    return new ParserBolt("zookeeperUrl", parserMap) {
-      @Override
-      protected SensorParserConfig getSensorParserConfig(String sensorType) {
-        try {
-          return 
SensorParserConfig.fromBytes(Bytes.toBytes(csvWithFieldTransformations));
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }
+  @Test
+  public void shouldThrowExceptionOnFailedWrite() throws Exception {
+    
when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8));
+    
when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic");
+    MockParserRunner mockParserRunner = new MockParserRunner(new 
HashSet<String>() {{ add("yaf"); }});
+    ParserConfigurations parserConfigurations = new ParserConfigurations();
+    parserConfigurations.updateSensorParserConfig("yaf", new 
SensorParserConfig());
+    doThrow(new IllegalStateException("write 
failed")).when(writerHandler).write(any(), any(), any(), any(), any());
+
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", mockParserRunner, 
new HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }}) {
 
       @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater(Optional.of(1));
+      public ParserConfigurations getConfigurations() {
+        return parserConfigurations;
       }
     };
-  }
 
-  private static void writeNonBatch(OutputCollector collector, ParserBolt 
bolt, Tuple t) {
-    bolt.execute(t);
-  }
+    parserBolt.setMessageGetStrategy(messageGetStrategy);
+    parserBolt.setOutputCollector(outputCollector);
+    parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{
+      put("yafTopic", "yaf");
+    }});
+    JSONObject message = new JSONObject();
+    message.put("field", "value");
+    mockParserRunner.setMessage(message);
+
+    MetronError error = new MetronError()
+            .withErrorType(Constants.ErrorType.PARSER_ERROR)
+            .withThrowable(new IllegalStateException("write failed"))
+            .withSensorType(Collections.singleton("yaf"))
+            .addRawMessage("originalMessage".getBytes(StandardCharsets.UTF_8));
+
+    parserBolt.execute(t1);
 
+    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM),
+            argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
+    verify(outputCollector, 
times(1)).reportError(any(IllegalStateException.class));
+    verify(outputCollector, times(1)).ack(t1);
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
index 2cba40a..31d87a7 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
@@ -25,21 +25,20 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
+
 import org.apache.commons.lang.SerializationUtils;
-import org.apache.metron.common.configuration.FieldValidator;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.common.utils.ReflectionUtils;
 import org.apache.metron.common.writer.MessageWriter;
 import org.apache.metron.integration.ProcessorResult;
+import org.apache.metron.parsers.ParserRunner;
+import org.apache.metron.parsers.ParserRunnerImpl;
 import org.apache.metron.parsers.bolt.ParserBolt;
 import org.apache.metron.parsers.bolt.WriterHandler;
-import org.apache.metron.parsers.topology.ParserComponents;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
@@ -83,41 +82,13 @@ public class ParserDriver implements Serializable {
     List<byte[]> errors = new ArrayList<>();
 
     public ShimParserBolt(List<byte[]> output) {
-      super(null
-          , Collections.singletonMap(
-              sensorType == null ? config.getSensorTopic() : sensorType,
-              new ParserComponents(
-              ReflectionUtils.createInstance(config.getParserClassName()),
-                  null,
-                  new WriterHandler(new CollectingWriter(output))
-              )
-         )
-      );
+      super(null, parserRunner, Collections.singletonMap(sensorType, new 
WriterHandler(new CollectingWriter(output))));
       this.output = output;
-      Map<String, ParserComponents> sensorToComponentMap = 
getSensorToComponentMap();
-      for(Entry<String, ParserComponents> sensorToComponents : 
sensorToComponentMap.entrySet()) {
-        
sensorToComponents.getValue().getMessageParser().configure(config.getParserConfig());
-      }
     }
 
     @Override
     public ParserConfigurations getConfigurations() {
-      return new ParserConfigurations() {
-        @Override
-        public SensorParserConfig getSensorParserConfig(String sensorType) {
-          return config;
-        }
-
-        @Override
-        public Map<String, Object> getGlobalConfig() {
-          return globalConfig;
-        }
-
-        @Override
-        public List<FieldValidator> getFieldValidations() {
-          return new ArrayList<>();
-        }
-      };
+      return config;
     }
 
     @Override
@@ -125,28 +96,34 @@ public class ParserDriver implements Serializable {
     }
 
     @Override
-    protected void handleError(byte[] originalMessage, Tuple tuple, Throwable 
ex, OutputCollector collector) {
+    protected void handleError(String sensorType, byte[] originalMessage, 
Tuple tuple, Throwable ex, OutputCollector collector) {
       errors.add(originalMessage);
       LOG.error("Error parsing message: " + ex.getMessage(), ex);
     }
 
+    @SuppressWarnings("unchecked")
     public ProcessorResult<List<byte[]>> getResults() {
       return new 
ProcessorResult.Builder<List<byte[]>>().withProcessErrors(errors)
                                                         .withResult(output)
                                                         .build();
-
     }
   }
 
 
-  private SensorParserConfig config;
-  private Map<String, Object> globalConfig;
+  private ParserConfigurations config;
   private String sensorType;
+  private ParserRunner parserRunner;
 
   public ParserDriver(String sensorType, String parserConfig, String 
globalConfig) throws IOException {
-    config = SensorParserConfig.fromBytes(parserConfig.getBytes());
-    this.sensorType = sensorType;
-    this.globalConfig = JSONUtils.INSTANCE.load(globalConfig, 
JSONUtils.MAP_SUPPLIER);
+    SensorParserConfig sensorParserConfig = 
SensorParserConfig.fromBytes(parserConfig.getBytes());
+    this.sensorType = sensorType == null ? sensorParserConfig.getSensorTopic() 
: sensorType;
+    config = new ParserConfigurations();
+    config.updateSensorParserConfig(this.sensorType, 
SensorParserConfig.fromBytes(parserConfig.getBytes()));
+    config.updateGlobalConfig(JSONUtils.INSTANCE.load(globalConfig, 
JSONUtils.MAP_SUPPLIER));
+
+    parserRunner = new ParserRunnerImpl(new HashSet<String>() {{
+      add(sensorType);
+    }});
   }
 
   public ProcessorResult<List<byte[]>> run(Iterable<byte[]> in) {
@@ -163,6 +140,7 @@ public class ParserDriver implements Serializable {
 
   public Tuple toTuple(byte[] record) {
     Tuple ret = mock(Tuple.class);
+    when(ret.getStringByField("topic")).thenReturn(sensorType);
     when(ret.getBinary(eq(0))).thenReturn(record);
     return ret;
   }

Reply via email to